1use crate::StructMode;
137use std::io::BufRead;
138use std::sync::Arc;
139
140use chrono::Utc;
141use serde::Serialize;
142
143use arrow_array::timezone::Tz;
144use arrow_array::types::*;
145use arrow_array::{downcast_integer, make_array, RecordBatch, RecordBatchReader, StructArray};
146use arrow_data::ArrayData;
147use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
148pub use schema::*;
149
150use crate::reader::boolean_array::BooleanArrayDecoder;
151use crate::reader::decimal_array::DecimalArrayDecoder;
152use crate::reader::list_array::ListArrayDecoder;
153use crate::reader::map_array::MapArrayDecoder;
154use crate::reader::null_array::NullArrayDecoder;
155use crate::reader::primitive_array::PrimitiveArrayDecoder;
156use crate::reader::string_array::StringArrayDecoder;
157use crate::reader::string_view_array::StringViewArrayDecoder;
158use crate::reader::struct_array::StructArrayDecoder;
159use crate::reader::tape::{Tape, TapeDecoder};
160use crate::reader::timestamp_array::TimestampArrayDecoder;
161
162mod boolean_array;
163mod decimal_array;
164mod list_array;
165mod map_array;
166mod null_array;
167mod primitive_array;
168mod schema;
169mod serializer;
170mod string_array;
171mod string_view_array;
172mod struct_array;
173mod tape;
174mod timestamp_array;
175
176pub struct ReaderBuilder {
178 batch_size: usize,
179 coerce_primitive: bool,
180 strict_mode: bool,
181 is_field: bool,
182 struct_mode: StructMode,
183
184 schema: SchemaRef,
185}
186
187impl ReaderBuilder {
188 pub fn new(schema: SchemaRef) -> Self {
197 Self {
198 batch_size: 1024,
199 coerce_primitive: false,
200 strict_mode: false,
201 is_field: false,
202 struct_mode: Default::default(),
203 schema,
204 }
205 }
206
207 pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
238 Self {
239 batch_size: 1024,
240 coerce_primitive: false,
241 strict_mode: false,
242 is_field: true,
243 struct_mode: Default::default(),
244 schema: Arc::new(Schema::new([field.into()])),
245 }
246 }
247
248 pub fn with_batch_size(self, batch_size: usize) -> Self {
250 Self { batch_size, ..self }
251 }
252
253 pub fn with_coerce_primitive(self, coerce_primitive: bool) -> Self {
256 Self {
257 coerce_primitive,
258 ..self
259 }
260 }
261
262 pub fn with_strict_mode(self, strict_mode: bool) -> Self {
268 Self {
269 strict_mode,
270 ..self
271 }
272 }
273
274 pub fn with_struct_mode(self, struct_mode: StructMode) -> Self {
278 Self {
279 struct_mode,
280 ..self
281 }
282 }
283
284 pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
286 Ok(Reader {
287 reader,
288 decoder: self.build_decoder()?,
289 })
290 }
291
292 pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
294 let (data_type, nullable) = match self.is_field {
295 false => (DataType::Struct(self.schema.fields.clone()), false),
296 true => {
297 let field = &self.schema.fields[0];
298 (field.data_type().clone(), field.is_nullable())
299 }
300 };
301
302 let decoder = make_decoder(
303 data_type,
304 self.coerce_primitive,
305 self.strict_mode,
306 nullable,
307 self.struct_mode,
308 )?;
309
310 let num_fields = self.schema.flattened_fields().len();
311
312 Ok(Decoder {
313 decoder,
314 is_field: self.is_field,
315 tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
316 batch_size: self.batch_size,
317 schema: self.schema,
318 })
319 }
320}
321
322pub struct Reader<R> {
326 reader: R,
327 decoder: Decoder,
328}
329
330impl<R> std::fmt::Debug for Reader<R> {
331 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332 f.debug_struct("Reader")
333 .field("decoder", &self.decoder)
334 .finish()
335 }
336}
337
338impl<R: BufRead> Reader<R> {
339 fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
341 loop {
342 let buf = self.reader.fill_buf()?;
343 if buf.is_empty() {
344 break;
345 }
346 let read = buf.len();
347
348 let decoded = self.decoder.decode(buf)?;
349 self.reader.consume(decoded);
350 if decoded != read {
351 break;
352 }
353 }
354 self.decoder.flush()
355 }
356}
357
358impl<R: BufRead> Iterator for Reader<R> {
359 type Item = Result<RecordBatch, ArrowError>;
360
361 fn next(&mut self) -> Option<Self::Item> {
362 self.read().transpose()
363 }
364}
365
366impl<R: BufRead> RecordBatchReader for Reader<R> {
367 fn schema(&self) -> SchemaRef {
368 self.decoder.schema.clone()
369 }
370}
371
372pub struct Decoder {
413 tape_decoder: TapeDecoder,
414 decoder: Box<dyn ArrayDecoder>,
415 batch_size: usize,
416 is_field: bool,
417 schema: SchemaRef,
418}
419
420impl std::fmt::Debug for Decoder {
421 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
422 f.debug_struct("Decoder")
423 .field("schema", &self.schema)
424 .field("batch_size", &self.batch_size)
425 .finish()
426 }
427}
428
429impl Decoder {
430 pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
439 self.tape_decoder.decode(buf)
440 }
441
442 pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
617 self.tape_decoder.serialize(rows)
618 }
619
620 pub fn has_partial_record(&self) -> bool {
622 self.tape_decoder.has_partial_row()
623 }
624
625 pub fn len(&self) -> usize {
627 self.tape_decoder.num_buffered_rows()
628 }
629
630 pub fn is_empty(&self) -> bool {
632 self.len() == 0
633 }
634
635 pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
642 let tape = self.tape_decoder.finish()?;
643
644 if tape.num_rows() == 0 {
645 return Ok(None);
646 }
647
648 let mut next_object = 1;
650 let pos: Vec<_> = (0..tape.num_rows())
651 .map(|_| {
652 let next = tape.next(next_object, "row").unwrap();
653 std::mem::replace(&mut next_object, next)
654 })
655 .collect();
656
657 let decoded = self.decoder.decode(&tape, &pos)?;
658 self.tape_decoder.clear();
659
660 let batch = match self.is_field {
661 true => RecordBatch::try_new(self.schema.clone(), vec![make_array(decoded)])?,
662 false => {
663 RecordBatch::from(StructArray::from(decoded)).with_schema(self.schema.clone())?
664 }
665 };
666
667 Ok(Some(batch))
668 }
669}
670
671trait ArrayDecoder: Send {
672 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError>;
674}
675
676macro_rules! primitive_decoder {
677 ($t:ty, $data_type:expr) => {
678 Ok(Box::new(PrimitiveArrayDecoder::<$t>::new($data_type)))
679 };
680}
681
682fn make_decoder(
683 data_type: DataType,
684 coerce_primitive: bool,
685 strict_mode: bool,
686 is_nullable: bool,
687 struct_mode: StructMode,
688) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
689 downcast_integer! {
690 data_type => (primitive_decoder, data_type),
691 DataType::Null => Ok(Box::<NullArrayDecoder>::default()),
692 DataType::Float16 => primitive_decoder!(Float16Type, data_type),
693 DataType::Float32 => primitive_decoder!(Float32Type, data_type),
694 DataType::Float64 => primitive_decoder!(Float64Type, data_type),
695 DataType::Timestamp(TimeUnit::Second, None) => {
696 Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, Utc)))
697 },
698 DataType::Timestamp(TimeUnit::Millisecond, None) => {
699 Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, Utc)))
700 },
701 DataType::Timestamp(TimeUnit::Microsecond, None) => {
702 Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, Utc)))
703 },
704 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
705 Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, Utc)))
706 },
707 DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
708 let tz: Tz = tz.parse()?;
709 Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, tz)))
710 },
711 DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
712 let tz: Tz = tz.parse()?;
713 Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, tz)))
714 },
715 DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
716 let tz: Tz = tz.parse()?;
717 Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, tz)))
718 },
719 DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
720 let tz: Tz = tz.parse()?;
721 Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, tz)))
722 },
723 DataType::Date32 => primitive_decoder!(Date32Type, data_type),
724 DataType::Date64 => primitive_decoder!(Date64Type, data_type),
725 DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
726 DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type),
727 DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type),
728 DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type),
729 DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type),
730 DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type),
731 DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type),
732 DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type),
733 DataType::Decimal128(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal128Type>::new(p, s))),
734 DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal256Type>::new(p, s))),
735 DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
736 DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
737 DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))),
738 DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
739 DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
740 DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
741 DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
742 DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
743 Err(ArrowError::JsonError(format!("{data_type} is not supported by JSON")))
744 }
745 DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
746 d => Err(ArrowError::NotYetImplemented(format!("Support for {d} in JSON reader")))
747 }
748}
749
750#[cfg(test)]
751mod tests {
752 use serde_json::json;
753 use std::fs::File;
754 use std::io::{BufReader, Cursor, Seek};
755
756 use arrow_array::cast::AsArray;
757 use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray, StringViewArray};
758 use arrow_buffer::{ArrowNativeType, Buffer};
759 use arrow_cast::display::{ArrayFormatter, FormatOptions};
760 use arrow_data::ArrayDataBuilder;
761 use arrow_schema::{Field, Fields};
762
763 use super::*;
764
765 fn do_read(
766 buf: &str,
767 batch_size: usize,
768 coerce_primitive: bool,
769 strict_mode: bool,
770 schema: SchemaRef,
771 ) -> Vec<RecordBatch> {
772 let mut unbuffered = vec![];
773
774 for batch_size in [1, 3, 100, batch_size] {
776 unbuffered = ReaderBuilder::new(schema.clone())
777 .with_batch_size(batch_size)
778 .with_coerce_primitive(coerce_primitive)
779 .build(Cursor::new(buf.as_bytes()))
780 .unwrap()
781 .collect::<Result<Vec<_>, _>>()
782 .unwrap();
783
784 for b in unbuffered.iter().take(unbuffered.len() - 1) {
785 assert_eq!(b.num_rows(), batch_size)
786 }
787
788 for b in [1, 3, 5] {
790 let buffered = ReaderBuilder::new(schema.clone())
791 .with_batch_size(batch_size)
792 .with_coerce_primitive(coerce_primitive)
793 .with_strict_mode(strict_mode)
794 .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
795 .unwrap()
796 .collect::<Result<Vec<_>, _>>()
797 .unwrap();
798 assert_eq!(unbuffered, buffered);
799 }
800 }
801
802 unbuffered
803 }
804
805 #[test]
806 fn test_basic() {
807 let buf = r#"
808 {"a": 1, "b": 2, "c": true, "d": 1}
809 {"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254}
810
811 {"b": 6, "a": 2.0, "d": 45}
812 {"b": "5", "a": 2}
813 {"b": 4e0}
814 {"b": 7, "a": null}
815 "#;
816
817 let schema = Arc::new(Schema::new(vec![
818 Field::new("a", DataType::Int64, true),
819 Field::new("b", DataType::Int32, true),
820 Field::new("c", DataType::Boolean, true),
821 Field::new("d", DataType::Date32, true),
822 Field::new("e", DataType::Date64, true),
823 ]));
824
825 let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
826 assert!(decoder.is_empty());
827 assert_eq!(decoder.len(), 0);
828 assert!(!decoder.has_partial_record());
829 assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
830 assert!(!decoder.is_empty());
831 assert_eq!(decoder.len(), 6);
832 assert!(!decoder.has_partial_record());
833 let batch = decoder.flush().unwrap().unwrap();
834 assert_eq!(batch.num_rows(), 6);
835 assert!(decoder.is_empty());
836 assert_eq!(decoder.len(), 0);
837 assert!(!decoder.has_partial_record());
838
839 let batches = do_read(buf, 1024, false, false, schema);
840 assert_eq!(batches.len(), 1);
841
842 let col1 = batches[0].column(0).as_primitive::<Int64Type>();
843 assert_eq!(col1.null_count(), 2);
844 assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
845 assert!(col1.is_null(4));
846 assert!(col1.is_null(5));
847
848 let col2 = batches[0].column(1).as_primitive::<Int32Type>();
849 assert_eq!(col2.null_count(), 0);
850 assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
851
852 let col3 = batches[0].column(2).as_boolean();
853 assert_eq!(col3.null_count(), 4);
854 assert!(col3.value(0));
855 assert!(!col3.is_null(0));
856 assert!(!col3.value(1));
857 assert!(!col3.is_null(1));
858
859 let col4 = batches[0].column(3).as_primitive::<Date32Type>();
860 assert_eq!(col4.null_count(), 3);
861 assert!(col4.is_null(3));
862 assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]);
863
864 let col5 = batches[0].column(4).as_primitive::<Date64Type>();
865 assert_eq!(col5.null_count(), 5);
866 assert!(col5.is_null(0));
867 assert!(col5.is_null(2));
868 assert!(col5.is_null(3));
869 assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]);
870 }
871
872 #[test]
873 fn test_string() {
874 let buf = r#"
875 {"a": "1", "b": "2"}
876 {"a": "hello", "b": "shoo"}
877 {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
878
879 {"b": null}
880 {"b": "", "a": null}
881
882 "#;
883 let schema = Arc::new(Schema::new(vec![
884 Field::new("a", DataType::Utf8, true),
885 Field::new("b", DataType::LargeUtf8, true),
886 ]));
887
888 let batches = do_read(buf, 1024, false, false, schema);
889 assert_eq!(batches.len(), 1);
890
891 let col1 = batches[0].column(0).as_string::<i32>();
892 assert_eq!(col1.null_count(), 2);
893 assert_eq!(col1.value(0), "1");
894 assert_eq!(col1.value(1), "hello");
895 assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
896 assert!(col1.is_null(3));
897 assert!(col1.is_null(4));
898
899 let col2 = batches[0].column(1).as_string::<i64>();
900 assert_eq!(col2.null_count(), 1);
901 assert_eq!(col2.value(0), "2");
902 assert_eq!(col2.value(1), "shoo");
903 assert_eq!(col2.value(2), "\t😁foo");
904 assert!(col2.is_null(3));
905 assert_eq!(col2.value(4), "");
906 }
907
908 #[test]
909 fn test_long_string_view_allocation() {
910 let expected_capacity: usize = 41;
920
921 let buf = r#"
922 {"a": "short", "b": "dummy"}
923 {"a": "this is definitely long", "b": "dummy"}
924 {"a": "hello", "b": "dummy"}
925 {"a": "\nfoobar😀asfgÿ", "b": "dummy"}
926 "#;
927
928 let schema = Arc::new(Schema::new(vec![
929 Field::new("a", DataType::Utf8View, true),
930 Field::new("b", DataType::LargeUtf8, true),
931 ]));
932
933 let batches = do_read(buf, 1024, false, false, schema);
934 assert_eq!(batches.len(), 1, "Expected one record batch");
935
936 let col_a = batches[0].column(0);
938 let string_view_array = col_a
939 .as_any()
940 .downcast_ref::<StringViewArray>()
941 .expect("Column should be a StringViewArray");
942
943 let data_buffer = string_view_array.to_data().buffers()[0].len();
946
947 assert!(
950 data_buffer >= expected_capacity,
951 "Data buffer length ({}) should be at least {}",
952 data_buffer,
953 expected_capacity
954 );
955
956 assert_eq!(string_view_array.value(0), "short");
958 assert_eq!(string_view_array.value(1), "this is definitely long");
959 assert_eq!(string_view_array.value(2), "hello");
960 assert_eq!(string_view_array.value(3), "\nfoobar😀asfgÿ");
961 }
962
963 #[test]
965 fn test_numeric_view_allocation() {
966 let expected_capacity: usize = 33;
974
975 let buf = r#"
976 {"n": 123456789}
977 {"n": 1000000000000}
978 {"n": 3.1415}
979 {"n": 2.718281828459045}
980 "#;
981
982 let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Utf8View, true)]));
983
984 let batches = do_read(buf, 1024, true, false, schema);
985 assert_eq!(batches.len(), 1, "Expected one record batch");
986
987 let col_n = batches[0].column(0);
988 let string_view_array = col_n
989 .as_any()
990 .downcast_ref::<StringViewArray>()
991 .expect("Column should be a StringViewArray");
992
993 let data_buffer = string_view_array.to_data().buffers()[0].len();
995 assert!(
996 data_buffer >= expected_capacity,
997 "Data buffer length ({}) should be at least {}",
998 data_buffer,
999 expected_capacity
1000 );
1001
1002 assert_eq!(string_view_array.value(0), "123456789");
1005 assert_eq!(string_view_array.value(1), "1000000000000");
1006 assert_eq!(string_view_array.value(2), "3.1415");
1007 assert_eq!(string_view_array.value(3), "2.718281828459045");
1008 }
1009
1010 #[test]
1011 fn test_string_with_uft8view() {
1012 let buf = r#"
1013 {"a": "1", "b": "2"}
1014 {"a": "hello", "b": "shoo"}
1015 {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
1016
1017 {"b": null}
1018 {"b": "", "a": null}
1019
1020 "#;
1021 let schema = Arc::new(Schema::new(vec![
1022 Field::new("a", DataType::Utf8View, true),
1023 Field::new("b", DataType::LargeUtf8, true),
1024 ]));
1025
1026 let batches = do_read(buf, 1024, false, false, schema);
1027 assert_eq!(batches.len(), 1);
1028
1029 let col1 = batches[0].column(0).as_string_view();
1030 assert_eq!(col1.null_count(), 2);
1031 assert_eq!(col1.value(0), "1");
1032 assert_eq!(col1.value(1), "hello");
1033 assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1034 assert!(col1.is_null(3));
1035 assert!(col1.is_null(4));
1036 assert_eq!(col1.data_type(), &DataType::Utf8View);
1037
1038 let col2 = batches[0].column(1).as_string::<i64>();
1039 assert_eq!(col2.null_count(), 1);
1040 assert_eq!(col2.value(0), "2");
1041 assert_eq!(col2.value(1), "shoo");
1042 assert_eq!(col2.value(2), "\t😁foo");
1043 assert!(col2.is_null(3));
1044 assert_eq!(col2.value(4), "");
1045 }
1046
1047 #[test]
1048 fn test_complex() {
1049 let buf = r#"
1050 {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
1051 {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1052 {"list": null, "nested": {"a": null}}
1053 "#;
1054
1055 let schema = Arc::new(Schema::new(vec![
1056 Field::new_list("list", Field::new("element", DataType::Int32, false), true),
1057 Field::new_struct(
1058 "nested",
1059 vec![
1060 Field::new("a", DataType::Int32, true),
1061 Field::new("b", DataType::Int32, true),
1062 ],
1063 true,
1064 ),
1065 Field::new_struct(
1066 "nested_list",
1067 vec![Field::new_list(
1068 "list2",
1069 Field::new_struct(
1070 "element",
1071 vec![Field::new("c", DataType::Int32, false)],
1072 false,
1073 ),
1074 true,
1075 )],
1076 true,
1077 ),
1078 ]));
1079
1080 let batches = do_read(buf, 1024, false, false, schema);
1081 assert_eq!(batches.len(), 1);
1082
1083 let list = batches[0].column(0).as_list::<i32>();
1084 assert_eq!(list.len(), 3);
1085 assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
1086 assert_eq!(list.null_count(), 1);
1087 assert!(list.is_null(2));
1088 let list_values = list.values().as_primitive::<Int32Type>();
1089 assert_eq!(list_values.values(), &[5, 6]);
1090
1091 let nested = batches[0].column(1).as_struct();
1092 let a = nested.column(0).as_primitive::<Int32Type>();
1093 assert_eq!(list.null_count(), 1);
1094 assert_eq!(a.values(), &[1, 7, 0]);
1095 assert!(list.is_null(2));
1096
1097 let b = nested.column(1).as_primitive::<Int32Type>();
1098 assert_eq!(b.null_count(), 2);
1099 assert_eq!(b.len(), 3);
1100 assert_eq!(b.value(0), 2);
1101 assert!(b.is_null(1));
1102 assert!(b.is_null(2));
1103
1104 let nested_list = batches[0].column(2).as_struct();
1105 assert_eq!(nested_list.len(), 3);
1106 assert_eq!(nested_list.null_count(), 1);
1107 assert!(nested_list.is_null(2));
1108
1109 let list2 = nested_list.column(0).as_list::<i32>();
1110 assert_eq!(list2.len(), 3);
1111 assert_eq!(list2.null_count(), 1);
1112 assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
1113 assert!(list2.is_null(2));
1114
1115 let list2_values = list2.values().as_struct();
1116
1117 let c = list2_values.column(0).as_primitive::<Int32Type>();
1118 assert_eq!(c.values(), &[3, 4]);
1119 }
1120
1121 #[test]
1122 fn test_projection() {
1123 let buf = r#"
1124 {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
1125 {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1126 "#;
1127
1128 let schema = Arc::new(Schema::new(vec![
1129 Field::new_struct(
1130 "nested",
1131 vec![Field::new("a", DataType::Int32, false)],
1132 true,
1133 ),
1134 Field::new_struct(
1135 "nested_list",
1136 vec![Field::new_list(
1137 "list2",
1138 Field::new_struct(
1139 "element",
1140 vec![Field::new("d", DataType::Int32, true)],
1141 false,
1142 ),
1143 true,
1144 )],
1145 true,
1146 ),
1147 ]));
1148
1149 let batches = do_read(buf, 1024, false, false, schema);
1150 assert_eq!(batches.len(), 1);
1151
1152 let nested = batches[0].column(0).as_struct();
1153 assert_eq!(nested.num_columns(), 1);
1154 let a = nested.column(0).as_primitive::<Int32Type>();
1155 assert_eq!(a.null_count(), 0);
1156 assert_eq!(a.values(), &[1, 7]);
1157
1158 let nested_list = batches[0].column(1).as_struct();
1159 assert_eq!(nested_list.num_columns(), 1);
1160 assert_eq!(nested_list.null_count(), 0);
1161
1162 let list2 = nested_list.column(0).as_list::<i32>();
1163 assert_eq!(list2.value_offsets(), &[0, 2, 2]);
1164 assert_eq!(list2.null_count(), 0);
1165
1166 let child = list2.values().as_struct();
1167 assert_eq!(child.num_columns(), 1);
1168 assert_eq!(child.len(), 2);
1169 assert_eq!(child.null_count(), 0);
1170
1171 let c = child.column(0).as_primitive::<Int32Type>();
1172 assert_eq!(c.values(), &[5, 0]);
1173 assert_eq!(c.null_count(), 1);
1174 assert!(c.is_null(1));
1175 }
1176
1177 #[test]
1178 fn test_map() {
1179 let buf = r#"
1180 {"map": {"a": ["foo", null]}}
1181 {"map": {"a": [null], "b": []}}
1182 {"map": {"c": null, "a": ["baz"]}}
1183 "#;
1184 let map = Field::new_map(
1185 "map",
1186 "entries",
1187 Field::new("key", DataType::Utf8, false),
1188 Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
1189 false,
1190 true,
1191 );
1192
1193 let schema = Arc::new(Schema::new(vec![map]));
1194
1195 let batches = do_read(buf, 1024, false, false, schema);
1196 assert_eq!(batches.len(), 1);
1197
1198 let map = batches[0].column(0).as_map();
1199 let map_keys = map.keys().as_string::<i32>();
1200 let map_values = map.values().as_list::<i32>();
1201 assert_eq!(map.value_offsets(), &[0, 1, 3, 5]);
1202
1203 let k: Vec<_> = map_keys.iter().flatten().collect();
1204 assert_eq!(&k, &["a", "a", "b", "c", "a"]);
1205
1206 let list_values = map_values.values().as_string::<i32>();
1207 let lv: Vec<_> = list_values.iter().collect();
1208 assert_eq!(&lv, &[Some("foo"), None, None, Some("baz")]);
1209 assert_eq!(map_values.value_offsets(), &[0, 2, 3, 3, 3, 4]);
1210 assert_eq!(map_values.null_count(), 1);
1211 assert!(map_values.is_null(3));
1212
1213 let options = FormatOptions::default().with_null("null");
1214 let formatter = ArrayFormatter::try_new(map, &options).unwrap();
1215 assert_eq!(formatter.value(0).to_string(), "{a: [foo, null]}");
1216 assert_eq!(formatter.value(1).to_string(), "{a: [null], b: []}");
1217 assert_eq!(formatter.value(2).to_string(), "{c: null, a: [baz]}");
1218 }
1219
1220 #[test]
1221 fn test_not_coercing_primitive_into_string_without_flag() {
1222 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
1223
1224 let buf = r#"{"a": 1}"#;
1225 let err = ReaderBuilder::new(schema.clone())
1226 .with_batch_size(1024)
1227 .build(Cursor::new(buf.as_bytes()))
1228 .unwrap()
1229 .read()
1230 .unwrap_err();
1231
1232 assert_eq!(
1233 err.to_string(),
1234 "Json error: whilst decoding field 'a': expected string got 1"
1235 );
1236
1237 let buf = r#"{"a": true}"#;
1238 let err = ReaderBuilder::new(schema)
1239 .with_batch_size(1024)
1240 .build(Cursor::new(buf.as_bytes()))
1241 .unwrap()
1242 .read()
1243 .unwrap_err();
1244
1245 assert_eq!(
1246 err.to_string(),
1247 "Json error: whilst decoding field 'a': expected string got true"
1248 );
1249 }
1250
1251 #[test]
1252 fn test_coercing_primitive_into_string() {
1253 let buf = r#"
1254 {"a": 1, "b": 2, "c": true}
1255 {"a": 2E0, "b": 4, "c": false}
1256
1257 {"b": 6, "a": 2.0}
1258 {"b": "5", "a": 2}
1259 {"b": 4e0}
1260 {"b": 7, "a": null}
1261 "#;
1262
1263 let schema = Arc::new(Schema::new(vec![
1264 Field::new("a", DataType::Utf8, true),
1265 Field::new("b", DataType::Utf8, true),
1266 Field::new("c", DataType::Utf8, true),
1267 ]));
1268
1269 let batches = do_read(buf, 1024, true, false, schema);
1270 assert_eq!(batches.len(), 1);
1271
1272 let col1 = batches[0].column(0).as_string::<i32>();
1273 assert_eq!(col1.null_count(), 2);
1274 assert_eq!(col1.value(0), "1");
1275 assert_eq!(col1.value(1), "2E0");
1276 assert_eq!(col1.value(2), "2.0");
1277 assert_eq!(col1.value(3), "2");
1278 assert!(col1.is_null(4));
1279 assert!(col1.is_null(5));
1280
1281 let col2 = batches[0].column(1).as_string::<i32>();
1282 assert_eq!(col2.null_count(), 0);
1283 assert_eq!(col2.value(0), "2");
1284 assert_eq!(col2.value(1), "4");
1285 assert_eq!(col2.value(2), "6");
1286 assert_eq!(col2.value(3), "5");
1287 assert_eq!(col2.value(4), "4e0");
1288 assert_eq!(col2.value(5), "7");
1289
1290 let col3 = batches[0].column(2).as_string::<i32>();
1291 assert_eq!(col3.null_count(), 4);
1292 assert_eq!(col3.value(0), "true");
1293 assert_eq!(col3.value(1), "false");
1294 assert!(col3.is_null(2));
1295 assert!(col3.is_null(3));
1296 assert!(col3.is_null(4));
1297 assert!(col3.is_null(5));
1298 }
1299
1300 fn test_decimal<T: DecimalType>(data_type: DataType) {
1301 let buf = r#"
1302 {"a": 1, "b": 2, "c": 38.30}
1303 {"a": 2, "b": 4, "c": 123.456}
1304
1305 {"b": 1337, "a": "2.0452"}
1306 {"b": "5", "a": "11034.2"}
1307 {"b": 40}
1308 {"b": 1234, "a": null}
1309 "#;
1310
1311 let schema = Arc::new(Schema::new(vec![
1312 Field::new("a", data_type.clone(), true),
1313 Field::new("b", data_type.clone(), true),
1314 Field::new("c", data_type, true),
1315 ]));
1316
1317 let batches = do_read(buf, 1024, true, false, schema);
1318 assert_eq!(batches.len(), 1);
1319
1320 let col1 = batches[0].column(0).as_primitive::<T>();
1321 assert_eq!(col1.null_count(), 2);
1322 assert!(col1.is_null(4));
1323 assert!(col1.is_null(5));
1324 assert_eq!(
1325 col1.values(),
1326 &[100, 200, 204, 1103420, 0, 0].map(T::Native::usize_as)
1327 );
1328
1329 let col2 = batches[0].column(1).as_primitive::<T>();
1330 assert_eq!(col2.null_count(), 0);
1331 assert_eq!(
1332 col2.values(),
1333 &[200, 400, 133700, 500, 4000, 123400].map(T::Native::usize_as)
1334 );
1335
1336 let col3 = batches[0].column(2).as_primitive::<T>();
1337 assert_eq!(col3.null_count(), 4);
1338 assert!(!col3.is_null(0));
1339 assert!(!col3.is_null(1));
1340 assert!(col3.is_null(2));
1341 assert!(col3.is_null(3));
1342 assert!(col3.is_null(4));
1343 assert!(col3.is_null(5));
1344 assert_eq!(
1345 col3.values(),
1346 &[3830, 12345, 0, 0, 0, 0].map(T::Native::usize_as)
1347 );
1348 }
1349
1350 #[test]
1351 fn test_decimals() {
1352 test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
1353 test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
1354 }
1355
1356 fn test_timestamp<T: ArrowTimestampType>() {
1357 let buf = r#"
1358 {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"}
1359 {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456}
1360
1361 {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"}
1362 {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"}
1363 {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"}
1364 {"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"}
1365 "#;
1366
1367 let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".into()));
1368 let schema = Arc::new(Schema::new(vec![
1369 Field::new("a", T::DATA_TYPE, true),
1370 Field::new("b", T::DATA_TYPE, true),
1371 Field::new("c", T::DATA_TYPE, true),
1372 Field::new("d", with_timezone, true),
1373 ]));
1374
1375 let batches = do_read(buf, 1024, true, false, schema);
1376 assert_eq!(batches.len(), 1);
1377
1378 let unit_in_nanos: i64 = match T::UNIT {
1379 TimeUnit::Second => 1_000_000_000,
1380 TimeUnit::Millisecond => 1_000_000,
1381 TimeUnit::Microsecond => 1_000,
1382 TimeUnit::Nanosecond => 1,
1383 };
1384
1385 let col1 = batches[0].column(0).as_primitive::<T>();
1386 assert_eq!(col1.null_count(), 4);
1387 assert!(col1.is_null(2));
1388 assert!(col1.is_null(3));
1389 assert!(col1.is_null(4));
1390 assert!(col1.is_null(5));
1391 assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1392
1393 let col2 = batches[0].column(1).as_primitive::<T>();
1394 assert_eq!(col2.null_count(), 1);
1395 assert!(col2.is_null(5));
1396 assert_eq!(
1397 col2.values(),
1398 &[
1399 1599572549190855000 / unit_in_nanos,
1400 1599572549190855000 / unit_in_nanos,
1401 1599572549000000000 / unit_in_nanos,
1402 40,
1403 1234,
1404 0
1405 ]
1406 );
1407
1408 let col3 = batches[0].column(2).as_primitive::<T>();
1409 assert_eq!(col3.null_count(), 0);
1410 assert_eq!(
1411 col3.values(),
1412 &[
1413 38,
1414 123,
1415 854702816123000000 / unit_in_nanos,
1416 1599572549190855000 / unit_in_nanos,
1417 854702816123000000 / unit_in_nanos,
1418 854738816123000000 / unit_in_nanos
1419 ]
1420 );
1421
1422 let col4 = batches[0].column(3).as_primitive::<T>();
1423
1424 assert_eq!(col4.null_count(), 0);
1425 assert_eq!(
1426 col4.values(),
1427 &[
1428 854674016123000000 / unit_in_nanos,
1429 123,
1430 854702816123000000 / unit_in_nanos,
1431 854720816123000000 / unit_in_nanos,
1432 854674016000000000 / unit_in_nanos,
1433 854640000000000000 / unit_in_nanos
1434 ]
1435 );
1436 }
1437
1438 #[test]
1439 fn test_timestamps() {
1440 test_timestamp::<TimestampSecondType>();
1441 test_timestamp::<TimestampMillisecondType>();
1442 test_timestamp::<TimestampMicrosecondType>();
1443 test_timestamp::<TimestampNanosecondType>();
1444 }
1445
1446 fn test_time<T: ArrowTemporalType>() {
1447 let buf = r#"
1448 {"a": 1, "b": "09:26:56.123 AM", "c": 38.30}
1449 {"a": 2, "b": "23:59:59", "c": 123.456}
1450
1451 {"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"}
1452 {"b": 40, "c": "13:42:29.190855"}
1453 {"b": 1234, "a": null, "c": "09:26:56.123"}
1454 {"c": "14:26:56.123"}
1455 "#;
1456
1457 let unit = match T::DATA_TYPE {
1458 DataType::Time32(unit) | DataType::Time64(unit) => unit,
1459 _ => unreachable!(),
1460 };
1461
1462 let unit_in_nanos = match unit {
1463 TimeUnit::Second => 1_000_000_000,
1464 TimeUnit::Millisecond => 1_000_000,
1465 TimeUnit::Microsecond => 1_000,
1466 TimeUnit::Nanosecond => 1,
1467 };
1468
1469 let schema = Arc::new(Schema::new(vec![
1470 Field::new("a", T::DATA_TYPE, true),
1471 Field::new("b", T::DATA_TYPE, true),
1472 Field::new("c", T::DATA_TYPE, true),
1473 ]));
1474
1475 let batches = do_read(buf, 1024, true, false, schema);
1476 assert_eq!(batches.len(), 1);
1477
1478 let col1 = batches[0].column(0).as_primitive::<T>();
1479 assert_eq!(col1.null_count(), 4);
1480 assert!(col1.is_null(2));
1481 assert!(col1.is_null(3));
1482 assert!(col1.is_null(4));
1483 assert!(col1.is_null(5));
1484 assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1485
1486 let col2 = batches[0].column(1).as_primitive::<T>();
1487 assert_eq!(col2.null_count(), 1);
1488 assert!(col2.is_null(5));
1489 assert_eq!(
1490 col2.values(),
1491 &[
1492 34016123000000 / unit_in_nanos,
1493 86399000000000 / unit_in_nanos,
1494 64800000000000 / unit_in_nanos,
1495 40,
1496 1234,
1497 0
1498 ]
1499 .map(T::Native::usize_as)
1500 );
1501
1502 let col3 = batches[0].column(2).as_primitive::<T>();
1503 assert_eq!(col3.null_count(), 0);
1504 assert_eq!(
1505 col3.values(),
1506 &[
1507 38,
1508 123,
1509 34016123000000 / unit_in_nanos,
1510 49349190855000 / unit_in_nanos,
1511 34016123000000 / unit_in_nanos,
1512 52016123000000 / unit_in_nanos
1513 ]
1514 .map(T::Native::usize_as)
1515 );
1516 }
1517
1518 #[test]
1519 fn test_times() {
1520 test_time::<Time32MillisecondType>();
1521 test_time::<Time32SecondType>();
1522 test_time::<Time64MicrosecondType>();
1523 test_time::<Time64NanosecondType>();
1524 }
1525
1526 fn test_duration<T: ArrowTemporalType>() {
1527 let buf = r#"
1528 {"a": 1, "b": "2"}
1529 {"a": 3, "b": null}
1530 "#;
1531
1532 let schema = Arc::new(Schema::new(vec![
1533 Field::new("a", T::DATA_TYPE, true),
1534 Field::new("b", T::DATA_TYPE, true),
1535 ]));
1536
1537 let batches = do_read(buf, 1024, true, false, schema);
1538 assert_eq!(batches.len(), 1);
1539
1540 let col_a = batches[0].column_by_name("a").unwrap().as_primitive::<T>();
1541 assert_eq!(col_a.null_count(), 0);
1542 assert_eq!(col_a.values(), &[1, 3].map(T::Native::usize_as));
1543
1544 let col2 = batches[0].column_by_name("b").unwrap().as_primitive::<T>();
1545 assert_eq!(col2.null_count(), 1);
1546 assert_eq!(col2.values(), &[2, 0].map(T::Native::usize_as));
1547 }
1548
1549 #[test]
1550 fn test_durations() {
1551 test_duration::<DurationNanosecondType>();
1552 test_duration::<DurationMicrosecondType>();
1553 test_duration::<DurationMillisecondType>();
1554 test_duration::<DurationSecondType>();
1555 }
1556
1557 #[test]
1558 fn test_delta_checkpoint() {
1559 let json = "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}";
1560 let schema = Arc::new(Schema::new(vec![
1561 Field::new_struct(
1562 "protocol",
1563 vec![
1564 Field::new("minReaderVersion", DataType::Int32, true),
1565 Field::new("minWriterVersion", DataType::Int32, true),
1566 ],
1567 true,
1568 ),
1569 Field::new_struct(
1570 "add",
1571 vec![Field::new_map(
1572 "partitionValues",
1573 "key_value",
1574 Field::new("key", DataType::Utf8, false),
1575 Field::new("value", DataType::Utf8, true),
1576 false,
1577 false,
1578 )],
1579 true,
1580 ),
1581 ]));
1582
1583 let batches = do_read(json, 1024, true, false, schema);
1584 assert_eq!(batches.len(), 1);
1585
1586 let s: StructArray = batches.into_iter().next().unwrap().into();
1587 let opts = FormatOptions::default().with_null("null");
1588 let formatter = ArrayFormatter::try_new(&s, &opts).unwrap();
1589 assert_eq!(
1590 formatter.value(0).to_string(),
1591 "{protocol: {minReaderVersion: 1, minWriterVersion: 2}, add: null}"
1592 );
1593 }
1594
1595 #[test]
1596 fn struct_nullability() {
1597 let do_test = |child: DataType| {
1598 let non_null = r#"{"foo": {}}"#;
1600 let schema = Arc::new(Schema::new(vec![Field::new_struct(
1601 "foo",
1602 vec![Field::new("bar", child, false)],
1603 true,
1604 )]));
1605 let mut reader = ReaderBuilder::new(schema.clone())
1606 .build(Cursor::new(non_null.as_bytes()))
1607 .unwrap();
1608 assert!(reader.next().unwrap().is_err()); let null = r#"{"foo": {bar: null}}"#;
1611 let mut reader = ReaderBuilder::new(schema.clone())
1612 .build(Cursor::new(null.as_bytes()))
1613 .unwrap();
1614 assert!(reader.next().unwrap().is_err()); let null = r#"{"foo": null}"#;
1618 let mut reader = ReaderBuilder::new(schema)
1619 .build(Cursor::new(null.as_bytes()))
1620 .unwrap();
1621 let batch = reader.next().unwrap().unwrap();
1622 assert_eq!(batch.num_columns(), 1);
1623 let foo = batch.column(0).as_struct();
1624 assert_eq!(foo.len(), 1);
1625 assert!(foo.is_null(0));
1626 assert_eq!(foo.num_columns(), 1);
1627
1628 let bar = foo.column(0);
1629 assert_eq!(bar.len(), 1);
1630 assert!(bar.is_null(0));
1632 };
1633
1634 do_test(DataType::Boolean);
1635 do_test(DataType::Int32);
1636 do_test(DataType::Utf8);
1637 do_test(DataType::Decimal128(2, 1));
1638 do_test(DataType::Timestamp(
1639 TimeUnit::Microsecond,
1640 Some("+00:00".into()),
1641 ));
1642 }
1643
1644 #[test]
1645 fn test_truncation() {
1646 let buf = r#"
1647 {"i64": 9223372036854775807, "u64": 18446744073709551615 }
1648 {"i64": "9223372036854775807", "u64": "18446744073709551615" }
1649 {"i64": -9223372036854775808, "u64": 0 }
1650 {"i64": "-9223372036854775808", "u64": 0 }
1651 "#;
1652
1653 let schema = Arc::new(Schema::new(vec![
1654 Field::new("i64", DataType::Int64, true),
1655 Field::new("u64", DataType::UInt64, true),
1656 ]));
1657
1658 let batches = do_read(buf, 1024, true, false, schema);
1659 assert_eq!(batches.len(), 1);
1660
1661 let i64 = batches[0].column(0).as_primitive::<Int64Type>();
1662 assert_eq!(i64.values(), &[i64::MAX, i64::MAX, i64::MIN, i64::MIN]);
1663
1664 let u64 = batches[0].column(1).as_primitive::<UInt64Type>();
1665 assert_eq!(u64.values(), &[u64::MAX, u64::MAX, u64::MIN, u64::MIN]);
1666 }
1667
1668 #[test]
1669 fn test_timestamp_truncation() {
1670 let buf = r#"
1671 {"time": 9223372036854775807 }
1672 {"time": -9223372036854775808 }
1673 {"time": 9e5 }
1674 "#;
1675
1676 let schema = Arc::new(Schema::new(vec![Field::new(
1677 "time",
1678 DataType::Timestamp(TimeUnit::Nanosecond, None),
1679 true,
1680 )]));
1681
1682 let batches = do_read(buf, 1024, true, false, schema);
1683 assert_eq!(batches.len(), 1);
1684
1685 let i64 = batches[0]
1686 .column(0)
1687 .as_primitive::<TimestampNanosecondType>();
1688 assert_eq!(i64.values(), &[i64::MAX, i64::MIN, 900000]);
1689 }
1690
1691 #[test]
1692 fn test_strict_mode_no_missing_columns_in_schema() {
1693 let buf = r#"
1694 {"a": 1, "b": "2", "c": true}
1695 {"a": 2E0, "b": "4", "c": false}
1696 "#;
1697
1698 let schema = Arc::new(Schema::new(vec![
1699 Field::new("a", DataType::Int16, false),
1700 Field::new("b", DataType::Utf8, false),
1701 Field::new("c", DataType::Boolean, false),
1702 ]));
1703
1704 let batches = do_read(buf, 1024, true, true, schema);
1705 assert_eq!(batches.len(), 1);
1706
1707 let buf = r#"
1708 {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1709 {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1710 "#;
1711
1712 let schema = Arc::new(Schema::new(vec![
1713 Field::new("a", DataType::Int16, false),
1714 Field::new("b", DataType::Utf8, false),
1715 Field::new_struct(
1716 "c",
1717 vec![
1718 Field::new("a", DataType::Boolean, false),
1719 Field::new("b", DataType::Int16, false),
1720 ],
1721 false,
1722 ),
1723 ]));
1724
1725 let batches = do_read(buf, 1024, true, true, schema);
1726 assert_eq!(batches.len(), 1);
1727 }
1728
1729 #[test]
1730 fn test_strict_mode_missing_columns_in_schema() {
1731 let buf = r#"
1732 {"a": 1, "b": "2", "c": true}
1733 {"a": 2E0, "b": "4", "c": false}
1734 "#;
1735
1736 let schema = Arc::new(Schema::new(vec![
1737 Field::new("a", DataType::Int16, true),
1738 Field::new("c", DataType::Boolean, true),
1739 ]));
1740
1741 let err = ReaderBuilder::new(schema)
1742 .with_batch_size(1024)
1743 .with_strict_mode(true)
1744 .build(Cursor::new(buf.as_bytes()))
1745 .unwrap()
1746 .read()
1747 .unwrap_err();
1748
1749 assert_eq!(
1750 err.to_string(),
1751 "Json error: column 'b' missing from schema"
1752 );
1753
1754 let buf = r#"
1755 {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1756 {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1757 "#;
1758
1759 let schema = Arc::new(Schema::new(vec![
1760 Field::new("a", DataType::Int16, false),
1761 Field::new("b", DataType::Utf8, false),
1762 Field::new_struct("c", vec![Field::new("a", DataType::Boolean, false)], false),
1763 ]));
1764
1765 let err = ReaderBuilder::new(schema)
1766 .with_batch_size(1024)
1767 .with_strict_mode(true)
1768 .build(Cursor::new(buf.as_bytes()))
1769 .unwrap()
1770 .read()
1771 .unwrap_err();
1772
1773 assert_eq!(
1774 err.to_string(),
1775 "Json error: whilst decoding field 'c': column 'b' missing from schema"
1776 );
1777 }
1778
1779 fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
1780 let file = File::open(path).unwrap();
1781 let mut reader = BufReader::new(file);
1782 let schema = schema.unwrap_or_else(|| {
1783 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1784 reader.rewind().unwrap();
1785 schema
1786 });
1787 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1788 builder.build(reader).unwrap()
1789 }
1790
1791 #[test]
1792 fn test_json_basic() {
1793 let mut reader = read_file("test/data/basic.json", None);
1794 let batch = reader.next().unwrap().unwrap();
1795
1796 assert_eq!(8, batch.num_columns());
1797 assert_eq!(12, batch.num_rows());
1798
1799 let schema = reader.schema();
1800 let batch_schema = batch.schema();
1801 assert_eq!(schema, batch_schema);
1802
1803 let a = schema.column_with_name("a").unwrap();
1804 assert_eq!(0, a.0);
1805 assert_eq!(&DataType::Int64, a.1.data_type());
1806 let b = schema.column_with_name("b").unwrap();
1807 assert_eq!(1, b.0);
1808 assert_eq!(&DataType::Float64, b.1.data_type());
1809 let c = schema.column_with_name("c").unwrap();
1810 assert_eq!(2, c.0);
1811 assert_eq!(&DataType::Boolean, c.1.data_type());
1812 let d = schema.column_with_name("d").unwrap();
1813 assert_eq!(3, d.0);
1814 assert_eq!(&DataType::Utf8, d.1.data_type());
1815
1816 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1817 assert_eq!(1, aa.value(0));
1818 assert_eq!(-10, aa.value(1));
1819 let bb = batch.column(b.0).as_primitive::<Float64Type>();
1820 assert_eq!(2.0, bb.value(0));
1821 assert_eq!(-3.5, bb.value(1));
1822 let cc = batch.column(c.0).as_boolean();
1823 assert!(!cc.value(0));
1824 assert!(cc.value(10));
1825 let dd = batch.column(d.0).as_string::<i32>();
1826 assert_eq!("4", dd.value(0));
1827 assert_eq!("text", dd.value(8));
1828 }
1829
1830 #[test]
1831 fn test_json_empty_projection() {
1832 let mut reader = read_file("test/data/basic.json", Some(Schema::empty()));
1833 let batch = reader.next().unwrap().unwrap();
1834
1835 assert_eq!(0, batch.num_columns());
1836 assert_eq!(12, batch.num_rows());
1837 }
1838
1839 #[test]
1840 fn test_json_basic_with_nulls() {
1841 let mut reader = read_file("test/data/basic_nulls.json", None);
1842 let batch = reader.next().unwrap().unwrap();
1843
1844 assert_eq!(4, batch.num_columns());
1845 assert_eq!(12, batch.num_rows());
1846
1847 let schema = reader.schema();
1848 let batch_schema = batch.schema();
1849 assert_eq!(schema, batch_schema);
1850
1851 let a = schema.column_with_name("a").unwrap();
1852 assert_eq!(&DataType::Int64, a.1.data_type());
1853 let b = schema.column_with_name("b").unwrap();
1854 assert_eq!(&DataType::Float64, b.1.data_type());
1855 let c = schema.column_with_name("c").unwrap();
1856 assert_eq!(&DataType::Boolean, c.1.data_type());
1857 let d = schema.column_with_name("d").unwrap();
1858 assert_eq!(&DataType::Utf8, d.1.data_type());
1859
1860 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1861 assert!(aa.is_valid(0));
1862 assert!(!aa.is_valid(1));
1863 assert!(!aa.is_valid(11));
1864 let bb = batch.column(b.0).as_primitive::<Float64Type>();
1865 assert!(bb.is_valid(0));
1866 assert!(!bb.is_valid(2));
1867 assert!(!bb.is_valid(11));
1868 let cc = batch.column(c.0).as_boolean();
1869 assert!(cc.is_valid(0));
1870 assert!(!cc.is_valid(4));
1871 assert!(!cc.is_valid(11));
1872 let dd = batch.column(d.0).as_string::<i32>();
1873 assert!(!dd.is_valid(0));
1874 assert!(dd.is_valid(1));
1875 assert!(!dd.is_valid(4));
1876 assert!(!dd.is_valid(11));
1877 }
1878
1879 #[test]
1880 fn test_json_basic_schema() {
1881 let schema = Schema::new(vec![
1882 Field::new("a", DataType::Int64, true),
1883 Field::new("b", DataType::Float32, false),
1884 Field::new("c", DataType::Boolean, false),
1885 Field::new("d", DataType::Utf8, false),
1886 ]);
1887
1888 let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1889 let reader_schema = reader.schema();
1890 assert_eq!(reader_schema.as_ref(), &schema);
1891 let batch = reader.next().unwrap().unwrap();
1892
1893 assert_eq!(4, batch.num_columns());
1894 assert_eq!(12, batch.num_rows());
1895
1896 let schema = batch.schema();
1897
1898 let a = schema.column_with_name("a").unwrap();
1899 assert_eq!(&DataType::Int64, a.1.data_type());
1900 let b = schema.column_with_name("b").unwrap();
1901 assert_eq!(&DataType::Float32, b.1.data_type());
1902 let c = schema.column_with_name("c").unwrap();
1903 assert_eq!(&DataType::Boolean, c.1.data_type());
1904 let d = schema.column_with_name("d").unwrap();
1905 assert_eq!(&DataType::Utf8, d.1.data_type());
1906
1907 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1908 assert_eq!(1, aa.value(0));
1909 assert_eq!(100000000000000, aa.value(11));
1910 let bb = batch.column(b.0).as_primitive::<Float32Type>();
1911 assert_eq!(2.0, bb.value(0));
1912 assert_eq!(-3.5, bb.value(1));
1913 }
1914
1915 #[test]
1916 fn test_json_basic_schema_projection() {
1917 let schema = Schema::new(vec![
1918 Field::new("a", DataType::Int64, true),
1919 Field::new("c", DataType::Boolean, false),
1920 ]);
1921
1922 let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1923 let batch = reader.next().unwrap().unwrap();
1924
1925 assert_eq!(2, batch.num_columns());
1926 assert_eq!(2, batch.schema().fields().len());
1927 assert_eq!(12, batch.num_rows());
1928
1929 assert_eq!(batch.schema().as_ref(), &schema);
1930
1931 let a = schema.column_with_name("a").unwrap();
1932 assert_eq!(0, a.0);
1933 assert_eq!(&DataType::Int64, a.1.data_type());
1934 let c = schema.column_with_name("c").unwrap();
1935 assert_eq!(1, c.0);
1936 assert_eq!(&DataType::Boolean, c.1.data_type());
1937 }
1938
1939 #[test]
1940 fn test_json_arrays() {
1941 let mut reader = read_file("test/data/arrays.json", None);
1942 let batch = reader.next().unwrap().unwrap();
1943
1944 assert_eq!(4, batch.num_columns());
1945 assert_eq!(3, batch.num_rows());
1946
1947 let schema = batch.schema();
1948
1949 let a = schema.column_with_name("a").unwrap();
1950 assert_eq!(&DataType::Int64, a.1.data_type());
1951 let b = schema.column_with_name("b").unwrap();
1952 assert_eq!(
1953 &DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))),
1954 b.1.data_type()
1955 );
1956 let c = schema.column_with_name("c").unwrap();
1957 assert_eq!(
1958 &DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
1959 c.1.data_type()
1960 );
1961 let d = schema.column_with_name("d").unwrap();
1962 assert_eq!(&DataType::Utf8, d.1.data_type());
1963
1964 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1965 assert_eq!(1, aa.value(0));
1966 assert_eq!(-10, aa.value(1));
1967 assert_eq!(1627668684594000000, aa.value(2));
1968 let bb = batch.column(b.0).as_list::<i32>();
1969 let bb = bb.values().as_primitive::<Float64Type>();
1970 assert_eq!(9, bb.len());
1971 assert_eq!(2.0, bb.value(0));
1972 assert_eq!(-6.1, bb.value(5));
1973 assert!(!bb.is_valid(7));
1974
1975 let cc = batch
1976 .column(c.0)
1977 .as_any()
1978 .downcast_ref::<ListArray>()
1979 .unwrap();
1980 let cc = cc.values().as_boolean();
1981 assert_eq!(6, cc.len());
1982 assert!(!cc.value(0));
1983 assert!(!cc.value(4));
1984 assert!(!cc.is_valid(5));
1985 }
1986
1987 #[test]
1988 fn test_empty_json_arrays() {
1989 let json_content = r#"
1990 {"items": []}
1991 {"items": null}
1992 {}
1993 "#;
1994
1995 let schema = Arc::new(Schema::new(vec![Field::new(
1996 "items",
1997 DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
1998 true,
1999 )]));
2000
2001 let batches = do_read(json_content, 1024, false, false, schema);
2002 assert_eq!(batches.len(), 1);
2003
2004 let col1 = batches[0].column(0).as_list::<i32>();
2005 assert_eq!(col1.null_count(), 2);
2006 assert!(col1.value(0).is_empty());
2007 assert_eq!(col1.value(0).data_type(), &DataType::Null);
2008 assert!(col1.is_null(1));
2009 assert!(col1.is_null(2));
2010 }
2011
2012 #[test]
2013 fn test_nested_empty_json_arrays() {
2014 let json_content = r#"
2015 {"items": [[],[]]}
2016 {"items": [[null, null],[null]]}
2017 "#;
2018
2019 let schema = Arc::new(Schema::new(vec![Field::new(
2020 "items",
2021 DataType::List(FieldRef::new(Field::new_list_field(
2022 DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2023 true,
2024 ))),
2025 true,
2026 )]));
2027
2028 let batches = do_read(json_content, 1024, false, false, schema);
2029 assert_eq!(batches.len(), 1);
2030
2031 let col1 = batches[0].column(0).as_list::<i32>();
2032 assert_eq!(col1.null_count(), 0);
2033 assert_eq!(col1.value(0).len(), 2);
2034 assert!(col1.value(0).as_list::<i32>().value(0).is_empty());
2035 assert!(col1.value(0).as_list::<i32>().value(1).is_empty());
2036
2037 assert_eq!(col1.value(1).len(), 2);
2038 assert_eq!(col1.value(1).as_list::<i32>().value(0).len(), 2);
2039 assert_eq!(col1.value(1).as_list::<i32>().value(1).len(), 1);
2040 }
2041
2042 #[test]
2043 fn test_nested_list_json_arrays() {
2044 let c_field = Field::new_struct("c", vec![Field::new("d", DataType::Utf8, true)], true);
2045 let a_struct_field = Field::new_struct(
2046 "a",
2047 vec![Field::new("b", DataType::Boolean, true), c_field.clone()],
2048 true,
2049 );
2050 let a_field = Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
2051 let schema = Arc::new(Schema::new(vec![a_field.clone()]));
2052 let builder = ReaderBuilder::new(schema).with_batch_size(64);
2053 let json_content = r#"
2054 {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
2055 {"a": [{"b": false, "c": null}]}
2056 {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
2057 {"a": null}
2058 {"a": []}
2059 {"a": [null]}
2060 "#;
2061 let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2062
2063 let d = StringArray::from(vec![
2065 Some("a_text"),
2066 Some("b_text"),
2067 None,
2068 Some("c_text"),
2069 Some("d_text"),
2070 None,
2071 None,
2072 ]);
2073 let c = ArrayDataBuilder::new(c_field.data_type().clone())
2074 .len(7)
2075 .add_child_data(d.to_data())
2076 .null_bit_buffer(Some(Buffer::from([0b00111011])))
2077 .build()
2078 .unwrap();
2079 let b = BooleanArray::from(vec![
2080 Some(true),
2081 Some(false),
2082 Some(false),
2083 Some(true),
2084 None,
2085 Some(true),
2086 None,
2087 ]);
2088 let a = ArrayDataBuilder::new(a_struct_field.data_type().clone())
2089 .len(7)
2090 .add_child_data(b.to_data())
2091 .add_child_data(c.clone())
2092 .null_bit_buffer(Some(Buffer::from([0b00111111])))
2093 .build()
2094 .unwrap();
2095 let a_list = ArrayDataBuilder::new(a_field.data_type().clone())
2096 .len(6)
2097 .add_buffer(Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7]))
2098 .add_child_data(a)
2099 .null_bit_buffer(Some(Buffer::from([0b00110111])))
2100 .build()
2101 .unwrap();
2102 let expected = make_array(a_list);
2103
2104 let batch = reader.next().unwrap().unwrap();
2106 let read = batch.column(0);
2107 assert_eq!(read.len(), 6);
2108 let read: &ListArray = read.as_list::<i32>();
2110 let expected = expected.as_list::<i32>();
2111 assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
2112 assert_eq!(read.nulls(), expected.nulls());
2114 let struct_array = read.values().as_struct();
2116 let expected_struct_array = expected.values().as_struct();
2117
2118 assert_eq!(7, struct_array.len());
2119 assert_eq!(1, struct_array.null_count());
2120 assert_eq!(7, expected_struct_array.len());
2121 assert_eq!(1, expected_struct_array.null_count());
2122 assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
2124 let read_b = struct_array.column(0);
2126 assert_eq!(read_b.as_ref(), &b);
2127 let read_c = struct_array.column(1);
2128 assert_eq!(read_c.to_data(), c);
2129 let read_c = read_c.as_struct();
2130 let read_d = read_c.column(0);
2131 assert_eq!(read_d.as_ref(), &d);
2132
2133 assert_eq!(read, expected);
2134 }
2135
2136 #[test]
2137 fn test_skip_empty_lines() {
2138 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2139 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
2140 let json_content = "
2141 {\"a\": 1}
2142 {\"a\": 2}
2143 {\"a\": 3}";
2144 let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2145 let batch = reader.next().unwrap().unwrap();
2146
2147 assert_eq!(1, batch.num_columns());
2148 assert_eq!(3, batch.num_rows());
2149
2150 let schema = reader.schema();
2151 let c = schema.column_with_name("a").unwrap();
2152 assert_eq!(&DataType::Int64, c.1.data_type());
2153 }
2154
2155 #[test]
2156 fn test_with_multiple_batches() {
2157 let file = File::open("test/data/basic_nulls.json").unwrap();
2158 let mut reader = BufReader::new(file);
2159 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2160 reader.rewind().unwrap();
2161
2162 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2163 let mut reader = builder.build(reader).unwrap();
2164
2165 let mut num_records = Vec::new();
2166 while let Some(rb) = reader.next().transpose().unwrap() {
2167 num_records.push(rb.num_rows());
2168 }
2169
2170 assert_eq!(vec![5, 5, 2], num_records);
2171 }
2172
2173 #[test]
2174 fn test_timestamp_from_json_seconds() {
2175 let schema = Schema::new(vec![Field::new(
2176 "a",
2177 DataType::Timestamp(TimeUnit::Second, None),
2178 true,
2179 )]);
2180
2181 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2182 let batch = reader.next().unwrap().unwrap();
2183
2184 assert_eq!(1, batch.num_columns());
2185 assert_eq!(12, batch.num_rows());
2186
2187 let schema = reader.schema();
2188 let batch_schema = batch.schema();
2189 assert_eq!(schema, batch_schema);
2190
2191 let a = schema.column_with_name("a").unwrap();
2192 assert_eq!(
2193 &DataType::Timestamp(TimeUnit::Second, None),
2194 a.1.data_type()
2195 );
2196
2197 let aa = batch.column(a.0).as_primitive::<TimestampSecondType>();
2198 assert!(aa.is_valid(0));
2199 assert!(!aa.is_valid(1));
2200 assert!(!aa.is_valid(2));
2201 assert_eq!(1, aa.value(0));
2202 assert_eq!(1, aa.value(3));
2203 assert_eq!(5, aa.value(7));
2204 }
2205
2206 #[test]
2207 fn test_timestamp_from_json_milliseconds() {
2208 let schema = Schema::new(vec![Field::new(
2209 "a",
2210 DataType::Timestamp(TimeUnit::Millisecond, None),
2211 true,
2212 )]);
2213
2214 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2215 let batch = reader.next().unwrap().unwrap();
2216
2217 assert_eq!(1, batch.num_columns());
2218 assert_eq!(12, batch.num_rows());
2219
2220 let schema = reader.schema();
2221 let batch_schema = batch.schema();
2222 assert_eq!(schema, batch_schema);
2223
2224 let a = schema.column_with_name("a").unwrap();
2225 assert_eq!(
2226 &DataType::Timestamp(TimeUnit::Millisecond, None),
2227 a.1.data_type()
2228 );
2229
2230 let aa = batch.column(a.0).as_primitive::<TimestampMillisecondType>();
2231 assert!(aa.is_valid(0));
2232 assert!(!aa.is_valid(1));
2233 assert!(!aa.is_valid(2));
2234 assert_eq!(1, aa.value(0));
2235 assert_eq!(1, aa.value(3));
2236 assert_eq!(5, aa.value(7));
2237 }
2238
2239 #[test]
2240 fn test_date_from_json_milliseconds() {
2241 let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
2242
2243 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2244 let batch = reader.next().unwrap().unwrap();
2245
2246 assert_eq!(1, batch.num_columns());
2247 assert_eq!(12, batch.num_rows());
2248
2249 let schema = reader.schema();
2250 let batch_schema = batch.schema();
2251 assert_eq!(schema, batch_schema);
2252
2253 let a = schema.column_with_name("a").unwrap();
2254 assert_eq!(&DataType::Date64, a.1.data_type());
2255
2256 let aa = batch.column(a.0).as_primitive::<Date64Type>();
2257 assert!(aa.is_valid(0));
2258 assert!(!aa.is_valid(1));
2259 assert!(!aa.is_valid(2));
2260 assert_eq!(1, aa.value(0));
2261 assert_eq!(1, aa.value(3));
2262 assert_eq!(5, aa.value(7));
2263 }
2264
2265 #[test]
2266 fn test_time_from_json_nanoseconds() {
2267 let schema = Schema::new(vec![Field::new(
2268 "a",
2269 DataType::Time64(TimeUnit::Nanosecond),
2270 true,
2271 )]);
2272
2273 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2274 let batch = reader.next().unwrap().unwrap();
2275
2276 assert_eq!(1, batch.num_columns());
2277 assert_eq!(12, batch.num_rows());
2278
2279 let schema = reader.schema();
2280 let batch_schema = batch.schema();
2281 assert_eq!(schema, batch_schema);
2282
2283 let a = schema.column_with_name("a").unwrap();
2284 assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
2285
2286 let aa = batch.column(a.0).as_primitive::<Time64NanosecondType>();
2287 assert!(aa.is_valid(0));
2288 assert!(!aa.is_valid(1));
2289 assert!(!aa.is_valid(2));
2290 assert_eq!(1, aa.value(0));
2291 assert_eq!(1, aa.value(3));
2292 assert_eq!(5, aa.value(7));
2293 }
2294
2295 #[test]
2296 fn test_json_iterator() {
2297 let file = File::open("test/data/basic.json").unwrap();
2298 let mut reader = BufReader::new(file);
2299 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2300 reader.rewind().unwrap();
2301
2302 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2303 let reader = builder.build(reader).unwrap();
2304 let schema = reader.schema();
2305 let (col_a_index, _) = schema.column_with_name("a").unwrap();
2306
2307 let mut sum_num_rows = 0;
2308 let mut num_batches = 0;
2309 let mut sum_a = 0;
2310 for batch in reader {
2311 let batch = batch.unwrap();
2312 assert_eq!(8, batch.num_columns());
2313 sum_num_rows += batch.num_rows();
2314 num_batches += 1;
2315 let batch_schema = batch.schema();
2316 assert_eq!(schema, batch_schema);
2317 let a_array = batch.column(col_a_index).as_primitive::<Int64Type>();
2318 sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
2319 }
2320 assert_eq!(12, sum_num_rows);
2321 assert_eq!(3, num_batches);
2322 assert_eq!(100000000000011, sum_a);
2323 }
2324
2325 #[test]
2326 fn test_decoder_error() {
2327 let schema = Arc::new(Schema::new(vec![Field::new_struct(
2328 "a",
2329 vec![Field::new("child", DataType::Int32, false)],
2330 true,
2331 )]));
2332
2333 let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
2334 let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
2335 assert!(decoder.tape_decoder.has_partial_row());
2336 assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2337 let _ = decoder.flush().unwrap_err();
2338 assert!(decoder.tape_decoder.has_partial_row());
2339 assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2340
2341 let parse_err = |s: &str| {
2342 ReaderBuilder::new(schema.clone())
2343 .build(Cursor::new(s.as_bytes()))
2344 .unwrap()
2345 .next()
2346 .unwrap()
2347 .unwrap_err()
2348 .to_string()
2349 };
2350
2351 let err = parse_err(r#"{"a": 123}"#);
2352 assert_eq!(
2353 err,
2354 "Json error: whilst decoding field 'a': expected { got 123"
2355 );
2356
2357 let err = parse_err(r#"{"a": ["bar"]}"#);
2358 assert_eq!(
2359 err,
2360 r#"Json error: whilst decoding field 'a': expected { got ["bar"]"#
2361 );
2362
2363 let err = parse_err(r#"{"a": []}"#);
2364 assert_eq!(
2365 err,
2366 "Json error: whilst decoding field 'a': expected { got []"
2367 );
2368
2369 let err = parse_err(r#"{"a": [{"child": 234}]}"#);
2370 assert_eq!(
2371 err,
2372 r#"Json error: whilst decoding field 'a': expected { got [{"child": 234}]"#
2373 );
2374
2375 let err = parse_err(r#"{"a": [{"child": {"foo": [{"foo": ["bar"]}]}}]}"#);
2376 assert_eq!(
2377 err,
2378 r#"Json error: whilst decoding field 'a': expected { got [{"child": {"foo": [{"foo": ["bar"]}]}}]"#
2379 );
2380
2381 let err = parse_err(r#"{"a": true}"#);
2382 assert_eq!(
2383 err,
2384 "Json error: whilst decoding field 'a': expected { got true"
2385 );
2386
2387 let err = parse_err(r#"{"a": false}"#);
2388 assert_eq!(
2389 err,
2390 "Json error: whilst decoding field 'a': expected { got false"
2391 );
2392
2393 let err = parse_err(r#"{"a": "foo"}"#);
2394 assert_eq!(
2395 err,
2396 "Json error: whilst decoding field 'a': expected { got \"foo\""
2397 );
2398
2399 let err = parse_err(r#"{"a": {"child": false}}"#);
2400 assert_eq!(
2401 err,
2402 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got false"
2403 );
2404
2405 let err = parse_err(r#"{"a": {"child": []}}"#);
2406 assert_eq!(
2407 err,
2408 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got []"
2409 );
2410
2411 let err = parse_err(r#"{"a": {"child": [123]}}"#);
2412 assert_eq!(
2413 err,
2414 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123]"
2415 );
2416
2417 let err = parse_err(r#"{"a": {"child": [123, 3465346]}}"#);
2418 assert_eq!(
2419 err,
2420 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123, 3465346]"
2421 );
2422 }
2423
2424 #[test]
2425 fn test_serialize_timestamp() {
2426 let json = vec![
2427 json!({"timestamp": 1681319393}),
2428 json!({"timestamp": "1970-01-01T00:00:00+02:00"}),
2429 ];
2430 let schema = Schema::new(vec![Field::new(
2431 "timestamp",
2432 DataType::Timestamp(TimeUnit::Second, None),
2433 true,
2434 )]);
2435 let mut decoder = ReaderBuilder::new(Arc::new(schema))
2436 .build_decoder()
2437 .unwrap();
2438 decoder.serialize(&json).unwrap();
2439 let batch = decoder.flush().unwrap().unwrap();
2440 assert_eq!(batch.num_rows(), 2);
2441 assert_eq!(batch.num_columns(), 1);
2442 let values = batch.column(0).as_primitive::<TimestampSecondType>();
2443 assert_eq!(values.values(), &[1681319393, -7200]);
2444 }
2445
2446 #[test]
2447 fn test_serialize_decimal() {
2448 let json = vec![
2449 json!({"decimal": 1.234}),
2450 json!({"decimal": "1.234"}),
2451 json!({"decimal": 1234}),
2452 json!({"decimal": "1234"}),
2453 ];
2454 let schema = Schema::new(vec![Field::new(
2455 "decimal",
2456 DataType::Decimal128(10, 3),
2457 true,
2458 )]);
2459 let mut decoder = ReaderBuilder::new(Arc::new(schema))
2460 .build_decoder()
2461 .unwrap();
2462 decoder.serialize(&json).unwrap();
2463 let batch = decoder.flush().unwrap().unwrap();
2464 assert_eq!(batch.num_rows(), 4);
2465 assert_eq!(batch.num_columns(), 1);
2466 let values = batch.column(0).as_primitive::<Decimal128Type>();
2467 assert_eq!(values.values(), &[1234, 1234, 1234000, 1234000]);
2468 }
2469
2470 #[test]
2471 fn test_serde_field() {
2472 let field = Field::new("int", DataType::Int32, true);
2473 let mut decoder = ReaderBuilder::new_with_field(field)
2474 .build_decoder()
2475 .unwrap();
2476 decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
2477 let b = decoder.flush().unwrap().unwrap();
2478 let values = b.column(0).as_primitive::<Int32Type>().values();
2479 assert_eq!(values, &[1, 2, 3, 4]);
2480 }
2481
2482 #[test]
2483 fn test_serde_large_numbers() {
2484 let field = Field::new("int", DataType::Int64, true);
2485 let mut decoder = ReaderBuilder::new_with_field(field)
2486 .build_decoder()
2487 .unwrap();
2488
2489 decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2490 let b = decoder.flush().unwrap().unwrap();
2491 let values = b.column(0).as_primitive::<Int64Type>().values();
2492 assert_eq!(values, &[1699148028689, 2, 3, 4]);
2493
2494 let field = Field::new(
2495 "int",
2496 DataType::Timestamp(TimeUnit::Microsecond, None),
2497 true,
2498 );
2499 let mut decoder = ReaderBuilder::new_with_field(field)
2500 .build_decoder()
2501 .unwrap();
2502
2503 decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2504 let b = decoder.flush().unwrap().unwrap();
2505 let values = b
2506 .column(0)
2507 .as_primitive::<TimestampMicrosecondType>()
2508 .values();
2509 assert_eq!(values, &[1699148028689, 2, 3, 4]);
2510 }
2511
2512 #[test]
2513 fn test_coercing_primitive_into_string_decoder() {
2514 let buf = &format!(
2515 r#"[{{"a": 1, "b": "A", "c": "T"}}, {{"a": 2, "b": "BB", "c": "F"}}, {{"a": {}, "b": 123, "c": false}}, {{"a": {}, "b": 789, "c": true}}]"#,
2516 (i32::MAX as i64 + 10),
2517 i64::MAX - 10
2518 );
2519 let schema = Schema::new(vec![
2520 Field::new("a", DataType::Float64, true),
2521 Field::new("b", DataType::Utf8, true),
2522 Field::new("c", DataType::Utf8, true),
2523 ]);
2524 let json_array: Vec<serde_json::Value> = serde_json::from_str(buf).unwrap();
2525 let schema_ref = Arc::new(schema);
2526
2527 let reader = ReaderBuilder::new(schema_ref.clone()).with_coerce_primitive(true);
2529 let mut decoder = reader.build_decoder().unwrap();
2530 decoder.serialize(json_array.as_slice()).unwrap();
2531 let batch = decoder.flush().unwrap().unwrap();
2532 assert_eq!(
2533 batch,
2534 RecordBatch::try_new(
2535 schema_ref,
2536 vec![
2537 Arc::new(Float64Array::from(vec![
2538 1.0,
2539 2.0,
2540 (i32::MAX as i64 + 10) as f64,
2541 (i64::MAX - 10) as f64
2542 ])),
2543 Arc::new(StringArray::from(vec!["A", "BB", "123", "789"])),
2544 Arc::new(StringArray::from(vec!["T", "F", "false", "true"])),
2545 ]
2546 )
2547 .unwrap()
2548 );
2549 }
2550
2551 fn _parse_structs(
2556 row: &str,
2557 struct_mode: StructMode,
2558 fields: Fields,
2559 as_struct: bool,
2560 ) -> Result<RecordBatch, ArrowError> {
2561 let builder = if as_struct {
2562 ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
2563 } else {
2564 ReaderBuilder::new(Arc::new(Schema::new(fields)))
2565 };
2566 builder
2567 .with_struct_mode(struct_mode)
2568 .build(Cursor::new(row.as_bytes()))
2569 .unwrap()
2570 .next()
2571 .unwrap()
2572 }
2573
2574 #[test]
2575 fn test_struct_decoding_list_length() {
2576 use arrow_array::array;
2577
2578 let row = "[1, 2]";
2579
2580 let mut fields = vec![Field::new("a", DataType::Int32, true)];
2581 let too_few_fields = Fields::from(fields.clone());
2582 fields.push(Field::new("b", DataType::Int32, true));
2583 let correct_fields = Fields::from(fields.clone());
2584 fields.push(Field::new("c", DataType::Int32, true));
2585 let too_many_fields = Fields::from(fields.clone());
2586
2587 let parse = |fields: Fields, as_struct: bool| {
2588 _parse_structs(row, StructMode::ListOnly, fields, as_struct)
2589 };
2590
2591 let expected_row = StructArray::new(
2592 correct_fields.clone(),
2593 vec![
2594 Arc::new(array::Int32Array::from(vec![1])),
2595 Arc::new(array::Int32Array::from(vec![2])),
2596 ],
2597 None,
2598 );
2599 let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);
2600
2601 assert_eq!(
2602 parse(too_few_fields.clone(), true).unwrap_err().to_string(),
2603 "Json error: found extra columns for 1 fields".to_string()
2604 );
2605 assert_eq!(
2606 parse(too_few_fields, false).unwrap_err().to_string(),
2607 "Json error: found extra columns for 1 fields".to_string()
2608 );
2609 assert_eq!(
2610 parse(correct_fields.clone(), true).unwrap(),
2611 RecordBatch::try_new(
2612 Arc::new(Schema::new(vec![row_field])),
2613 vec![Arc::new(expected_row.clone())]
2614 )
2615 .unwrap()
2616 );
2617 assert_eq!(
2618 parse(correct_fields, false).unwrap(),
2619 RecordBatch::from(expected_row)
2620 );
2621 assert_eq!(
2622 parse(too_many_fields.clone(), true)
2623 .unwrap_err()
2624 .to_string(),
2625 "Json error: found 2 columns for 3 fields".to_string()
2626 );
2627 assert_eq!(
2628 parse(too_many_fields, false).unwrap_err().to_string(),
2629 "Json error: found 2 columns for 3 fields".to_string()
2630 );
2631 }
2632
2633 #[test]
2634 fn test_struct_decoding() {
2635 use arrow_array::builder;
2636
2637 let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
2638 let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
2639 let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;
2640
2641 let struct_fields = Fields::from(vec![
2642 Field::new("b", DataType::new_list(DataType::Int32, true), true),
2643 Field::new_map(
2644 "c",
2645 "entries",
2646 Field::new("keys", DataType::Utf8, false),
2647 Field::new("values", DataType::Int32, true),
2648 false,
2649 false,
2650 ),
2651 ]);
2652
2653 let list_array =
2654 ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);
2655
2656 let map_array = {
2657 let mut map_builder = builder::MapBuilder::new(
2658 None,
2659 builder::StringBuilder::new(),
2660 builder::Int32Builder::new(),
2661 );
2662 map_builder.keys().append_value("d");
2663 map_builder.values().append_value(3);
2664 map_builder.append(true).unwrap();
2665 map_builder.finish()
2666 };
2667
2668 let struct_array = StructArray::new(
2669 struct_fields.clone(),
2670 vec![Arc::new(list_array), Arc::new(map_array)],
2671 None,
2672 );
2673
2674 let fields = Fields::from(vec![Field::new("a", DataType::Struct(struct_fields), true)]);
2675 let schema = Arc::new(Schema::new(fields.clone()));
2676 let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();
2677
2678 let parse = |row: &str, struct_mode: StructMode| {
2679 _parse_structs(row, struct_mode, fields.clone(), false)
2680 };
2681
2682 assert_eq!(
2683 parse(nested_object_json, StructMode::ObjectOnly).unwrap(),
2684 expected
2685 );
2686 assert_eq!(
2687 parse(nested_list_json, StructMode::ObjectOnly)
2688 .unwrap_err()
2689 .to_string(),
2690 "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
2691 );
2692 assert_eq!(
2693 parse(nested_mixed_json, StructMode::ObjectOnly)
2694 .unwrap_err()
2695 .to_string(),
2696 "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
2697 );
2698
2699 assert_eq!(
2700 parse(nested_list_json, StructMode::ListOnly).unwrap(),
2701 expected
2702 );
2703 assert_eq!(
2704 parse(nested_object_json, StructMode::ListOnly)
2705 .unwrap_err()
2706 .to_string(),
2707 "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
2708 );
2709 assert_eq!(
2710 parse(nested_mixed_json, StructMode::ListOnly)
2711 .unwrap_err()
2712 .to_string(),
2713 "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
2714 );
2715 }
2716
2717 #[test]
2723 fn test_struct_decoding_empty_list() {
2724 let int_field = Field::new("a", DataType::Int32, true);
2725 let struct_field = Field::new(
2726 "r",
2727 DataType::Struct(Fields::from(vec![int_field.clone()])),
2728 true,
2729 );
2730
2731 let parse = |row: &str, as_struct: bool, field: Field| {
2732 _parse_structs(
2733 row,
2734 StructMode::ListOnly,
2735 Fields::from(vec![field]),
2736 as_struct,
2737 )
2738 };
2739
2740 assert_eq!(
2742 parse("[]", true, struct_field.clone())
2743 .unwrap_err()
2744 .to_string(),
2745 "Json error: found 0 columns for 1 fields".to_owned()
2746 );
2747 assert_eq!(
2748 parse("[]", false, int_field.clone())
2749 .unwrap_err()
2750 .to_string(),
2751 "Json error: found 0 columns for 1 fields".to_owned()
2752 );
2753 assert_eq!(
2754 parse("[]", false, struct_field.clone())
2755 .unwrap_err()
2756 .to_string(),
2757 "Json error: found 0 columns for 1 fields".to_owned()
2758 );
2759 assert_eq!(
2760 parse("[[]]", false, struct_field.clone())
2761 .unwrap_err()
2762 .to_string(),
2763 "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
2764 );
2765 }
2766
2767 #[test]
2768 fn test_decode_list_struct_with_wrong_types() {
2769 let int_field = Field::new("a", DataType::Int32, true);
2770 let struct_field = Field::new(
2771 "r",
2772 DataType::Struct(Fields::from(vec![int_field.clone()])),
2773 true,
2774 );
2775
2776 let parse = |row: &str, as_struct: bool, field: Field| {
2777 _parse_structs(
2778 row,
2779 StructMode::ListOnly,
2780 Fields::from(vec![field]),
2781 as_struct,
2782 )
2783 };
2784
2785 assert_eq!(
2787 parse(r#"[["a"]]"#, false, struct_field.clone())
2788 .unwrap_err()
2789 .to_string(),
2790 "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2791 );
2792 assert_eq!(
2793 parse(r#"[["a"]]"#, true, struct_field.clone())
2794 .unwrap_err()
2795 .to_string(),
2796 "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2797 );
2798 assert_eq!(
2799 parse(r#"["a"]"#, true, int_field.clone())
2800 .unwrap_err()
2801 .to_string(),
2802 "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2803 );
2804 assert_eq!(
2805 parse(r#"["a"]"#, false, int_field.clone())
2806 .unwrap_err()
2807 .to_string(),
2808 "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2809 );
2810 }
2811}