1use crate::StructMode;
137use std::io::BufRead;
138use std::sync::Arc;
139
140use chrono::Utc;
141use serde::Serialize;
142
143use arrow_array::timezone::Tz;
144use arrow_array::types::*;
145use arrow_array::{downcast_integer, make_array, RecordBatch, RecordBatchReader, StructArray};
146use arrow_data::ArrayData;
147use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
148pub use schema::*;
149
150use crate::reader::boolean_array::BooleanArrayDecoder;
151use crate::reader::decimal_array::DecimalArrayDecoder;
152use crate::reader::list_array::ListArrayDecoder;
153use crate::reader::map_array::MapArrayDecoder;
154use crate::reader::null_array::NullArrayDecoder;
155use crate::reader::primitive_array::PrimitiveArrayDecoder;
156use crate::reader::string_array::StringArrayDecoder;
157use crate::reader::string_view_array::StringViewArrayDecoder;
158use crate::reader::struct_array::StructArrayDecoder;
159use crate::reader::tape::{Tape, TapeDecoder};
160use crate::reader::timestamp_array::TimestampArrayDecoder;
161
162mod boolean_array;
163mod decimal_array;
164mod list_array;
165mod map_array;
166mod null_array;
167mod primitive_array;
168mod schema;
169mod serializer;
170mod string_array;
171mod string_view_array;
172mod struct_array;
173mod tape;
174mod timestamp_array;
175
176pub struct ReaderBuilder {
178 batch_size: usize,
179 coerce_primitive: bool,
180 strict_mode: bool,
181 is_field: bool,
182 struct_mode: StructMode,
183
184 schema: SchemaRef,
185}
186
187impl ReaderBuilder {
188 pub fn new(schema: SchemaRef) -> Self {
197 Self {
198 batch_size: 1024,
199 coerce_primitive: false,
200 strict_mode: false,
201 is_field: false,
202 struct_mode: Default::default(),
203 schema,
204 }
205 }
206
207 pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
238 Self {
239 batch_size: 1024,
240 coerce_primitive: false,
241 strict_mode: false,
242 is_field: true,
243 struct_mode: Default::default(),
244 schema: Arc::new(Schema::new([field.into()])),
245 }
246 }
247
248 pub fn with_batch_size(self, batch_size: usize) -> Self {
250 Self { batch_size, ..self }
251 }
252
253 pub fn with_coerce_primitive(self, coerce_primitive: bool) -> Self {
256 Self {
257 coerce_primitive,
258 ..self
259 }
260 }
261
262 pub fn with_strict_mode(self, strict_mode: bool) -> Self {
268 Self {
269 strict_mode,
270 ..self
271 }
272 }
273
274 pub fn with_struct_mode(self, struct_mode: StructMode) -> Self {
278 Self {
279 struct_mode,
280 ..self
281 }
282 }
283
284 pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
286 Ok(Reader {
287 reader,
288 decoder: self.build_decoder()?,
289 })
290 }
291
292 pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
294 let (data_type, nullable) = match self.is_field {
295 false => (DataType::Struct(self.schema.fields.clone()), false),
296 true => {
297 let field = &self.schema.fields[0];
298 (field.data_type().clone(), field.is_nullable())
299 }
300 };
301
302 let decoder = make_decoder(
303 data_type,
304 self.coerce_primitive,
305 self.strict_mode,
306 nullable,
307 self.struct_mode,
308 )?;
309
310 let num_fields = self.schema.flattened_fields().len();
311
312 Ok(Decoder {
313 decoder,
314 is_field: self.is_field,
315 tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
316 batch_size: self.batch_size,
317 schema: self.schema,
318 })
319 }
320}
321
322pub struct Reader<R> {
326 reader: R,
327 decoder: Decoder,
328}
329
330impl<R> std::fmt::Debug for Reader<R> {
331 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332 f.debug_struct("Reader")
333 .field("decoder", &self.decoder)
334 .finish()
335 }
336}
337
338impl<R: BufRead> Reader<R> {
339 fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
341 loop {
342 let buf = self.reader.fill_buf()?;
343 if buf.is_empty() {
344 break;
345 }
346 let read = buf.len();
347
348 let decoded = self.decoder.decode(buf)?;
349 self.reader.consume(decoded);
350 if decoded != read {
351 break;
352 }
353 }
354 self.decoder.flush()
355 }
356}
357
358impl<R: BufRead> Iterator for Reader<R> {
359 type Item = Result<RecordBatch, ArrowError>;
360
361 fn next(&mut self) -> Option<Self::Item> {
362 self.read().transpose()
363 }
364}
365
366impl<R: BufRead> RecordBatchReader for Reader<R> {
367 fn schema(&self) -> SchemaRef {
368 self.decoder.schema.clone()
369 }
370}
371
372pub struct Decoder {
413 tape_decoder: TapeDecoder,
414 decoder: Box<dyn ArrayDecoder>,
415 batch_size: usize,
416 is_field: bool,
417 schema: SchemaRef,
418}
419
420impl std::fmt::Debug for Decoder {
421 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
422 f.debug_struct("Decoder")
423 .field("schema", &self.schema)
424 .field("batch_size", &self.batch_size)
425 .finish()
426 }
427}
428
429impl Decoder {
430 pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
439 self.tape_decoder.decode(buf)
440 }
441
442 pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
617 self.tape_decoder.serialize(rows)
618 }
619
620 pub fn has_partial_record(&self) -> bool {
622 self.tape_decoder.has_partial_row()
623 }
624
625 pub fn len(&self) -> usize {
627 self.tape_decoder.num_buffered_rows()
628 }
629
630 pub fn is_empty(&self) -> bool {
632 self.len() == 0
633 }
634
635 pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
642 let tape = self.tape_decoder.finish()?;
643
644 if tape.num_rows() == 0 {
645 return Ok(None);
646 }
647
648 let mut next_object = 1;
650 let pos: Vec<_> = (0..tape.num_rows())
651 .map(|_| {
652 let next = tape.next(next_object, "row").unwrap();
653 std::mem::replace(&mut next_object, next)
654 })
655 .collect();
656
657 let decoded = self.decoder.decode(&tape, &pos)?;
658 self.tape_decoder.clear();
659
660 let batch = match self.is_field {
661 true => RecordBatch::try_new(self.schema.clone(), vec![make_array(decoded)])?,
662 false => {
663 RecordBatch::from(StructArray::from(decoded)).with_schema(self.schema.clone())?
664 }
665 };
666
667 Ok(Some(batch))
668 }
669}
670
671trait ArrayDecoder: Send {
672 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError>;
674}
675
676macro_rules! primitive_decoder {
677 ($t:ty, $data_type:expr) => {
678 Ok(Box::new(PrimitiveArrayDecoder::<$t>::new($data_type)))
679 };
680}
681
682fn make_decoder(
683 data_type: DataType,
684 coerce_primitive: bool,
685 strict_mode: bool,
686 is_nullable: bool,
687 struct_mode: StructMode,
688) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
689 downcast_integer! {
690 data_type => (primitive_decoder, data_type),
691 DataType::Null => Ok(Box::<NullArrayDecoder>::default()),
692 DataType::Float16 => primitive_decoder!(Float16Type, data_type),
693 DataType::Float32 => primitive_decoder!(Float32Type, data_type),
694 DataType::Float64 => primitive_decoder!(Float64Type, data_type),
695 DataType::Timestamp(TimeUnit::Second, None) => {
696 Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, Utc)))
697 },
698 DataType::Timestamp(TimeUnit::Millisecond, None) => {
699 Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, Utc)))
700 },
701 DataType::Timestamp(TimeUnit::Microsecond, None) => {
702 Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, Utc)))
703 },
704 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
705 Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, Utc)))
706 },
707 DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
708 let tz: Tz = tz.parse()?;
709 Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, tz)))
710 },
711 DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
712 let tz: Tz = tz.parse()?;
713 Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, tz)))
714 },
715 DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
716 let tz: Tz = tz.parse()?;
717 Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, tz)))
718 },
719 DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
720 let tz: Tz = tz.parse()?;
721 Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, tz)))
722 },
723 DataType::Date32 => primitive_decoder!(Date32Type, data_type),
724 DataType::Date64 => primitive_decoder!(Date64Type, data_type),
725 DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
726 DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type),
727 DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type),
728 DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type),
729 DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type),
730 DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type),
731 DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type),
732 DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type),
733 DataType::Decimal128(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal128Type>::new(p, s))),
734 DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal256Type>::new(p, s))),
735 DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
736 DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
737 DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))),
738 DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
739 DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
740 DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
741 DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
742 DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => {
743 Err(ArrowError::JsonError(format!("{data_type} is not supported by JSON")))
744 }
745 DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
746 d => Err(ArrowError::NotYetImplemented(format!("Support for {d} in JSON reader")))
747 }
748}
749
750#[cfg(test)]
751mod tests {
752 use serde_json::json;
753 use std::fs::File;
754 use std::io::{BufReader, Cursor, Seek};
755
756 use arrow_array::cast::AsArray;
757 use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray, StringViewArray};
758 use arrow_buffer::{ArrowNativeType, Buffer};
759 use arrow_cast::display::{ArrayFormatter, FormatOptions};
760 use arrow_data::ArrayDataBuilder;
761 use arrow_schema::{Field, Fields};
762
763 use super::*;
764
765 fn do_read(
766 buf: &str,
767 batch_size: usize,
768 coerce_primitive: bool,
769 strict_mode: bool,
770 schema: SchemaRef,
771 ) -> Vec<RecordBatch> {
772 let mut unbuffered = vec![];
773
774 for batch_size in [1, 3, 100, batch_size] {
776 unbuffered = ReaderBuilder::new(schema.clone())
777 .with_batch_size(batch_size)
778 .with_coerce_primitive(coerce_primitive)
779 .build(Cursor::new(buf.as_bytes()))
780 .unwrap()
781 .collect::<Result<Vec<_>, _>>()
782 .unwrap();
783
784 for b in unbuffered.iter().take(unbuffered.len() - 1) {
785 assert_eq!(b.num_rows(), batch_size)
786 }
787
788 for b in [1, 3, 5] {
790 let buffered = ReaderBuilder::new(schema.clone())
791 .with_batch_size(batch_size)
792 .with_coerce_primitive(coerce_primitive)
793 .with_strict_mode(strict_mode)
794 .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
795 .unwrap()
796 .collect::<Result<Vec<_>, _>>()
797 .unwrap();
798 assert_eq!(unbuffered, buffered);
799 }
800 }
801
802 unbuffered
803 }
804
805 #[test]
806 fn test_basic() {
807 let buf = r#"
808 {"a": 1, "b": 2, "c": true, "d": 1}
809 {"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254}
810
811 {"b": 6, "a": 2.0, "d": 45}
812 {"b": "5", "a": 2}
813 {"b": 4e0}
814 {"b": 7, "a": null}
815 "#;
816
817 let schema = Arc::new(Schema::new(vec![
818 Field::new("a", DataType::Int64, true),
819 Field::new("b", DataType::Int32, true),
820 Field::new("c", DataType::Boolean, true),
821 Field::new("d", DataType::Date32, true),
822 Field::new("e", DataType::Date64, true),
823 ]));
824
825 let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
826 assert!(decoder.is_empty());
827 assert_eq!(decoder.len(), 0);
828 assert!(!decoder.has_partial_record());
829 assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
830 assert!(!decoder.is_empty());
831 assert_eq!(decoder.len(), 6);
832 assert!(!decoder.has_partial_record());
833 let batch = decoder.flush().unwrap().unwrap();
834 assert_eq!(batch.num_rows(), 6);
835 assert!(decoder.is_empty());
836 assert_eq!(decoder.len(), 0);
837 assert!(!decoder.has_partial_record());
838
839 let batches = do_read(buf, 1024, false, false, schema);
840 assert_eq!(batches.len(), 1);
841
842 let col1 = batches[0].column(0).as_primitive::<Int64Type>();
843 assert_eq!(col1.null_count(), 2);
844 assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
845 assert!(col1.is_null(4));
846 assert!(col1.is_null(5));
847
848 let col2 = batches[0].column(1).as_primitive::<Int32Type>();
849 assert_eq!(col2.null_count(), 0);
850 assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
851
852 let col3 = batches[0].column(2).as_boolean();
853 assert_eq!(col3.null_count(), 4);
854 assert!(col3.value(0));
855 assert!(!col3.is_null(0));
856 assert!(!col3.value(1));
857 assert!(!col3.is_null(1));
858
859 let col4 = batches[0].column(3).as_primitive::<Date32Type>();
860 assert_eq!(col4.null_count(), 3);
861 assert!(col4.is_null(3));
862 assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]);
863
864 let col5 = batches[0].column(4).as_primitive::<Date64Type>();
865 assert_eq!(col5.null_count(), 5);
866 assert!(col5.is_null(0));
867 assert!(col5.is_null(2));
868 assert!(col5.is_null(3));
869 assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]);
870 }
871
872 #[test]
873 fn test_string() {
874 let buf = r#"
875 {"a": "1", "b": "2"}
876 {"a": "hello", "b": "shoo"}
877 {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
878
879 {"b": null}
880 {"b": "", "a": null}
881
882 "#;
883 let schema = Arc::new(Schema::new(vec![
884 Field::new("a", DataType::Utf8, true),
885 Field::new("b", DataType::LargeUtf8, true),
886 ]));
887
888 let batches = do_read(buf, 1024, false, false, schema);
889 assert_eq!(batches.len(), 1);
890
891 let col1 = batches[0].column(0).as_string::<i32>();
892 assert_eq!(col1.null_count(), 2);
893 assert_eq!(col1.value(0), "1");
894 assert_eq!(col1.value(1), "hello");
895 assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
896 assert!(col1.is_null(3));
897 assert!(col1.is_null(4));
898
899 let col2 = batches[0].column(1).as_string::<i64>();
900 assert_eq!(col2.null_count(), 1);
901 assert_eq!(col2.value(0), "2");
902 assert_eq!(col2.value(1), "shoo");
903 assert_eq!(col2.value(2), "\t😁foo");
904 assert!(col2.is_null(3));
905 assert_eq!(col2.value(4), "");
906 }
907
908 #[test]
909 fn test_long_string_view_allocation() {
910 let expected_capacity: usize = 41;
920
921 let buf = r#"
922 {"a": "short", "b": "dummy"}
923 {"a": "this is definitely long", "b": "dummy"}
924 {"a": "hello", "b": "dummy"}
925 {"a": "\nfoobar😀asfgÿ", "b": "dummy"}
926 "#;
927
928 let schema = Arc::new(Schema::new(vec![
929 Field::new("a", DataType::Utf8View, true),
930 Field::new("b", DataType::LargeUtf8, true),
931 ]));
932
933 let batches = do_read(buf, 1024, false, false, schema);
934 assert_eq!(batches.len(), 1, "Expected one record batch");
935
936 let col_a = batches[0].column(0);
938 let string_view_array = col_a
939 .as_any()
940 .downcast_ref::<StringViewArray>()
941 .expect("Column should be a StringViewArray");
942
943 let data_buffer = string_view_array.to_data().buffers()[0].len();
946
947 assert!(
950 data_buffer >= expected_capacity,
951 "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
952 );
953
954 assert_eq!(string_view_array.value(0), "short");
956 assert_eq!(string_view_array.value(1), "this is definitely long");
957 assert_eq!(string_view_array.value(2), "hello");
958 assert_eq!(string_view_array.value(3), "\nfoobar😀asfgÿ");
959 }
960
961 #[test]
963 fn test_numeric_view_allocation() {
964 let expected_capacity: usize = 33;
972
973 let buf = r#"
974 {"n": 123456789}
975 {"n": 1000000000000}
976 {"n": 3.1415}
977 {"n": 2.718281828459045}
978 "#;
979
980 let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Utf8View, true)]));
981
982 let batches = do_read(buf, 1024, true, false, schema);
983 assert_eq!(batches.len(), 1, "Expected one record batch");
984
985 let col_n = batches[0].column(0);
986 let string_view_array = col_n
987 .as_any()
988 .downcast_ref::<StringViewArray>()
989 .expect("Column should be a StringViewArray");
990
991 let data_buffer = string_view_array.to_data().buffers()[0].len();
993 assert!(
994 data_buffer >= expected_capacity,
995 "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
996 );
997
998 assert_eq!(string_view_array.value(0), "123456789");
1001 assert_eq!(string_view_array.value(1), "1000000000000");
1002 assert_eq!(string_view_array.value(2), "3.1415");
1003 assert_eq!(string_view_array.value(3), "2.718281828459045");
1004 }
1005
1006 #[test]
1007 fn test_string_with_uft8view() {
1008 let buf = r#"
1009 {"a": "1", "b": "2"}
1010 {"a": "hello", "b": "shoo"}
1011 {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
1012
1013 {"b": null}
1014 {"b": "", "a": null}
1015
1016 "#;
1017 let schema = Arc::new(Schema::new(vec![
1018 Field::new("a", DataType::Utf8View, true),
1019 Field::new("b", DataType::LargeUtf8, true),
1020 ]));
1021
1022 let batches = do_read(buf, 1024, false, false, schema);
1023 assert_eq!(batches.len(), 1);
1024
1025 let col1 = batches[0].column(0).as_string_view();
1026 assert_eq!(col1.null_count(), 2);
1027 assert_eq!(col1.value(0), "1");
1028 assert_eq!(col1.value(1), "hello");
1029 assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1030 assert!(col1.is_null(3));
1031 assert!(col1.is_null(4));
1032 assert_eq!(col1.data_type(), &DataType::Utf8View);
1033
1034 let col2 = batches[0].column(1).as_string::<i64>();
1035 assert_eq!(col2.null_count(), 1);
1036 assert_eq!(col2.value(0), "2");
1037 assert_eq!(col2.value(1), "shoo");
1038 assert_eq!(col2.value(2), "\t😁foo");
1039 assert!(col2.is_null(3));
1040 assert_eq!(col2.value(4), "");
1041 }
1042
1043 #[test]
1044 fn test_complex() {
1045 let buf = r#"
1046 {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
1047 {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1048 {"list": null, "nested": {"a": null}}
1049 "#;
1050
1051 let schema = Arc::new(Schema::new(vec![
1052 Field::new_list("list", Field::new("element", DataType::Int32, false), true),
1053 Field::new_struct(
1054 "nested",
1055 vec![
1056 Field::new("a", DataType::Int32, true),
1057 Field::new("b", DataType::Int32, true),
1058 ],
1059 true,
1060 ),
1061 Field::new_struct(
1062 "nested_list",
1063 vec![Field::new_list(
1064 "list2",
1065 Field::new_struct(
1066 "element",
1067 vec![Field::new("c", DataType::Int32, false)],
1068 false,
1069 ),
1070 true,
1071 )],
1072 true,
1073 ),
1074 ]));
1075
1076 let batches = do_read(buf, 1024, false, false, schema);
1077 assert_eq!(batches.len(), 1);
1078
1079 let list = batches[0].column(0).as_list::<i32>();
1080 assert_eq!(list.len(), 3);
1081 assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
1082 assert_eq!(list.null_count(), 1);
1083 assert!(list.is_null(2));
1084 let list_values = list.values().as_primitive::<Int32Type>();
1085 assert_eq!(list_values.values(), &[5, 6]);
1086
1087 let nested = batches[0].column(1).as_struct();
1088 let a = nested.column(0).as_primitive::<Int32Type>();
1089 assert_eq!(list.null_count(), 1);
1090 assert_eq!(a.values(), &[1, 7, 0]);
1091 assert!(list.is_null(2));
1092
1093 let b = nested.column(1).as_primitive::<Int32Type>();
1094 assert_eq!(b.null_count(), 2);
1095 assert_eq!(b.len(), 3);
1096 assert_eq!(b.value(0), 2);
1097 assert!(b.is_null(1));
1098 assert!(b.is_null(2));
1099
1100 let nested_list = batches[0].column(2).as_struct();
1101 assert_eq!(nested_list.len(), 3);
1102 assert_eq!(nested_list.null_count(), 1);
1103 assert!(nested_list.is_null(2));
1104
1105 let list2 = nested_list.column(0).as_list::<i32>();
1106 assert_eq!(list2.len(), 3);
1107 assert_eq!(list2.null_count(), 1);
1108 assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
1109 assert!(list2.is_null(2));
1110
1111 let list2_values = list2.values().as_struct();
1112
1113 let c = list2_values.column(0).as_primitive::<Int32Type>();
1114 assert_eq!(c.values(), &[3, 4]);
1115 }
1116
1117 #[test]
1118 fn test_projection() {
1119 let buf = r#"
1120 {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
1121 {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1122 "#;
1123
1124 let schema = Arc::new(Schema::new(vec![
1125 Field::new_struct(
1126 "nested",
1127 vec![Field::new("a", DataType::Int32, false)],
1128 true,
1129 ),
1130 Field::new_struct(
1131 "nested_list",
1132 vec![Field::new_list(
1133 "list2",
1134 Field::new_struct(
1135 "element",
1136 vec![Field::new("d", DataType::Int32, true)],
1137 false,
1138 ),
1139 true,
1140 )],
1141 true,
1142 ),
1143 ]));
1144
1145 let batches = do_read(buf, 1024, false, false, schema);
1146 assert_eq!(batches.len(), 1);
1147
1148 let nested = batches[0].column(0).as_struct();
1149 assert_eq!(nested.num_columns(), 1);
1150 let a = nested.column(0).as_primitive::<Int32Type>();
1151 assert_eq!(a.null_count(), 0);
1152 assert_eq!(a.values(), &[1, 7]);
1153
1154 let nested_list = batches[0].column(1).as_struct();
1155 assert_eq!(nested_list.num_columns(), 1);
1156 assert_eq!(nested_list.null_count(), 0);
1157
1158 let list2 = nested_list.column(0).as_list::<i32>();
1159 assert_eq!(list2.value_offsets(), &[0, 2, 2]);
1160 assert_eq!(list2.null_count(), 0);
1161
1162 let child = list2.values().as_struct();
1163 assert_eq!(child.num_columns(), 1);
1164 assert_eq!(child.len(), 2);
1165 assert_eq!(child.null_count(), 0);
1166
1167 let c = child.column(0).as_primitive::<Int32Type>();
1168 assert_eq!(c.values(), &[5, 0]);
1169 assert_eq!(c.null_count(), 1);
1170 assert!(c.is_null(1));
1171 }
1172
1173 #[test]
1174 fn test_map() {
1175 let buf = r#"
1176 {"map": {"a": ["foo", null]}}
1177 {"map": {"a": [null], "b": []}}
1178 {"map": {"c": null, "a": ["baz"]}}
1179 "#;
1180 let map = Field::new_map(
1181 "map",
1182 "entries",
1183 Field::new("key", DataType::Utf8, false),
1184 Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
1185 false,
1186 true,
1187 );
1188
1189 let schema = Arc::new(Schema::new(vec![map]));
1190
1191 let batches = do_read(buf, 1024, false, false, schema);
1192 assert_eq!(batches.len(), 1);
1193
1194 let map = batches[0].column(0).as_map();
1195 let map_keys = map.keys().as_string::<i32>();
1196 let map_values = map.values().as_list::<i32>();
1197 assert_eq!(map.value_offsets(), &[0, 1, 3, 5]);
1198
1199 let k: Vec<_> = map_keys.iter().flatten().collect();
1200 assert_eq!(&k, &["a", "a", "b", "c", "a"]);
1201
1202 let list_values = map_values.values().as_string::<i32>();
1203 let lv: Vec<_> = list_values.iter().collect();
1204 assert_eq!(&lv, &[Some("foo"), None, None, Some("baz")]);
1205 assert_eq!(map_values.value_offsets(), &[0, 2, 3, 3, 3, 4]);
1206 assert_eq!(map_values.null_count(), 1);
1207 assert!(map_values.is_null(3));
1208
1209 let options = FormatOptions::default().with_null("null");
1210 let formatter = ArrayFormatter::try_new(map, &options).unwrap();
1211 assert_eq!(formatter.value(0).to_string(), "{a: [foo, null]}");
1212 assert_eq!(formatter.value(1).to_string(), "{a: [null], b: []}");
1213 assert_eq!(formatter.value(2).to_string(), "{c: null, a: [baz]}");
1214 }
1215
1216 #[test]
1217 fn test_not_coercing_primitive_into_string_without_flag() {
1218 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
1219
1220 let buf = r#"{"a": 1}"#;
1221 let err = ReaderBuilder::new(schema.clone())
1222 .with_batch_size(1024)
1223 .build(Cursor::new(buf.as_bytes()))
1224 .unwrap()
1225 .read()
1226 .unwrap_err();
1227
1228 assert_eq!(
1229 err.to_string(),
1230 "Json error: whilst decoding field 'a': expected string got 1"
1231 );
1232
1233 let buf = r#"{"a": true}"#;
1234 let err = ReaderBuilder::new(schema)
1235 .with_batch_size(1024)
1236 .build(Cursor::new(buf.as_bytes()))
1237 .unwrap()
1238 .read()
1239 .unwrap_err();
1240
1241 assert_eq!(
1242 err.to_string(),
1243 "Json error: whilst decoding field 'a': expected string got true"
1244 );
1245 }
1246
1247 #[test]
1248 fn test_coercing_primitive_into_string() {
1249 let buf = r#"
1250 {"a": 1, "b": 2, "c": true}
1251 {"a": 2E0, "b": 4, "c": false}
1252
1253 {"b": 6, "a": 2.0}
1254 {"b": "5", "a": 2}
1255 {"b": 4e0}
1256 {"b": 7, "a": null}
1257 "#;
1258
1259 let schema = Arc::new(Schema::new(vec![
1260 Field::new("a", DataType::Utf8, true),
1261 Field::new("b", DataType::Utf8, true),
1262 Field::new("c", DataType::Utf8, true),
1263 ]));
1264
1265 let batches = do_read(buf, 1024, true, false, schema);
1266 assert_eq!(batches.len(), 1);
1267
1268 let col1 = batches[0].column(0).as_string::<i32>();
1269 assert_eq!(col1.null_count(), 2);
1270 assert_eq!(col1.value(0), "1");
1271 assert_eq!(col1.value(1), "2E0");
1272 assert_eq!(col1.value(2), "2.0");
1273 assert_eq!(col1.value(3), "2");
1274 assert!(col1.is_null(4));
1275 assert!(col1.is_null(5));
1276
1277 let col2 = batches[0].column(1).as_string::<i32>();
1278 assert_eq!(col2.null_count(), 0);
1279 assert_eq!(col2.value(0), "2");
1280 assert_eq!(col2.value(1), "4");
1281 assert_eq!(col2.value(2), "6");
1282 assert_eq!(col2.value(3), "5");
1283 assert_eq!(col2.value(4), "4e0");
1284 assert_eq!(col2.value(5), "7");
1285
1286 let col3 = batches[0].column(2).as_string::<i32>();
1287 assert_eq!(col3.null_count(), 4);
1288 assert_eq!(col3.value(0), "true");
1289 assert_eq!(col3.value(1), "false");
1290 assert!(col3.is_null(2));
1291 assert!(col3.is_null(3));
1292 assert!(col3.is_null(4));
1293 assert!(col3.is_null(5));
1294 }
1295
1296 fn test_decimal<T: DecimalType>(data_type: DataType) {
1297 let buf = r#"
1298 {"a": 1, "b": 2, "c": 38.30}
1299 {"a": 2, "b": 4, "c": 123.456}
1300
1301 {"b": 1337, "a": "2.0452"}
1302 {"b": "5", "a": "11034.2"}
1303 {"b": 40}
1304 {"b": 1234, "a": null}
1305 "#;
1306
1307 let schema = Arc::new(Schema::new(vec![
1308 Field::new("a", data_type.clone(), true),
1309 Field::new("b", data_type.clone(), true),
1310 Field::new("c", data_type, true),
1311 ]));
1312
1313 let batches = do_read(buf, 1024, true, false, schema);
1314 assert_eq!(batches.len(), 1);
1315
1316 let col1 = batches[0].column(0).as_primitive::<T>();
1317 assert_eq!(col1.null_count(), 2);
1318 assert!(col1.is_null(4));
1319 assert!(col1.is_null(5));
1320 assert_eq!(
1321 col1.values(),
1322 &[100, 200, 204, 1103420, 0, 0].map(T::Native::usize_as)
1323 );
1324
1325 let col2 = batches[0].column(1).as_primitive::<T>();
1326 assert_eq!(col2.null_count(), 0);
1327 assert_eq!(
1328 col2.values(),
1329 &[200, 400, 133700, 500, 4000, 123400].map(T::Native::usize_as)
1330 );
1331
1332 let col3 = batches[0].column(2).as_primitive::<T>();
1333 assert_eq!(col3.null_count(), 4);
1334 assert!(!col3.is_null(0));
1335 assert!(!col3.is_null(1));
1336 assert!(col3.is_null(2));
1337 assert!(col3.is_null(3));
1338 assert!(col3.is_null(4));
1339 assert!(col3.is_null(5));
1340 assert_eq!(
1341 col3.values(),
1342 &[3830, 12345, 0, 0, 0, 0].map(T::Native::usize_as)
1343 );
1344 }
1345
1346 #[test]
1347 fn test_decimals() {
1348 test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
1349 test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
1350 }
1351
1352 fn test_timestamp<T: ArrowTimestampType>() {
1353 let buf = r#"
1354 {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"}
1355 {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456}
1356
1357 {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"}
1358 {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"}
1359 {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"}
1360 {"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"}
1361 "#;
1362
1363 let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".into()));
1364 let schema = Arc::new(Schema::new(vec![
1365 Field::new("a", T::DATA_TYPE, true),
1366 Field::new("b", T::DATA_TYPE, true),
1367 Field::new("c", T::DATA_TYPE, true),
1368 Field::new("d", with_timezone, true),
1369 ]));
1370
1371 let batches = do_read(buf, 1024, true, false, schema);
1372 assert_eq!(batches.len(), 1);
1373
1374 let unit_in_nanos: i64 = match T::UNIT {
1375 TimeUnit::Second => 1_000_000_000,
1376 TimeUnit::Millisecond => 1_000_000,
1377 TimeUnit::Microsecond => 1_000,
1378 TimeUnit::Nanosecond => 1,
1379 };
1380
1381 let col1 = batches[0].column(0).as_primitive::<T>();
1382 assert_eq!(col1.null_count(), 4);
1383 assert!(col1.is_null(2));
1384 assert!(col1.is_null(3));
1385 assert!(col1.is_null(4));
1386 assert!(col1.is_null(5));
1387 assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1388
1389 let col2 = batches[0].column(1).as_primitive::<T>();
1390 assert_eq!(col2.null_count(), 1);
1391 assert!(col2.is_null(5));
1392 assert_eq!(
1393 col2.values(),
1394 &[
1395 1599572549190855000 / unit_in_nanos,
1396 1599572549190855000 / unit_in_nanos,
1397 1599572549000000000 / unit_in_nanos,
1398 40,
1399 1234,
1400 0
1401 ]
1402 );
1403
1404 let col3 = batches[0].column(2).as_primitive::<T>();
1405 assert_eq!(col3.null_count(), 0);
1406 assert_eq!(
1407 col3.values(),
1408 &[
1409 38,
1410 123,
1411 854702816123000000 / unit_in_nanos,
1412 1599572549190855000 / unit_in_nanos,
1413 854702816123000000 / unit_in_nanos,
1414 854738816123000000 / unit_in_nanos
1415 ]
1416 );
1417
1418 let col4 = batches[0].column(3).as_primitive::<T>();
1419
1420 assert_eq!(col4.null_count(), 0);
1421 assert_eq!(
1422 col4.values(),
1423 &[
1424 854674016123000000 / unit_in_nanos,
1425 123,
1426 854702816123000000 / unit_in_nanos,
1427 854720816123000000 / unit_in_nanos,
1428 854674016000000000 / unit_in_nanos,
1429 854640000000000000 / unit_in_nanos
1430 ]
1431 );
1432 }
1433
1434 #[test]
1435 fn test_timestamps() {
1436 test_timestamp::<TimestampSecondType>();
1437 test_timestamp::<TimestampMillisecondType>();
1438 test_timestamp::<TimestampMicrosecondType>();
1439 test_timestamp::<TimestampNanosecondType>();
1440 }
1441
1442 fn test_time<T: ArrowTemporalType>() {
1443 let buf = r#"
1444 {"a": 1, "b": "09:26:56.123 AM", "c": 38.30}
1445 {"a": 2, "b": "23:59:59", "c": 123.456}
1446
1447 {"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"}
1448 {"b": 40, "c": "13:42:29.190855"}
1449 {"b": 1234, "a": null, "c": "09:26:56.123"}
1450 {"c": "14:26:56.123"}
1451 "#;
1452
1453 let unit = match T::DATA_TYPE {
1454 DataType::Time32(unit) | DataType::Time64(unit) => unit,
1455 _ => unreachable!(),
1456 };
1457
1458 let unit_in_nanos = match unit {
1459 TimeUnit::Second => 1_000_000_000,
1460 TimeUnit::Millisecond => 1_000_000,
1461 TimeUnit::Microsecond => 1_000,
1462 TimeUnit::Nanosecond => 1,
1463 };
1464
1465 let schema = Arc::new(Schema::new(vec![
1466 Field::new("a", T::DATA_TYPE, true),
1467 Field::new("b", T::DATA_TYPE, true),
1468 Field::new("c", T::DATA_TYPE, true),
1469 ]));
1470
1471 let batches = do_read(buf, 1024, true, false, schema);
1472 assert_eq!(batches.len(), 1);
1473
1474 let col1 = batches[0].column(0).as_primitive::<T>();
1475 assert_eq!(col1.null_count(), 4);
1476 assert!(col1.is_null(2));
1477 assert!(col1.is_null(3));
1478 assert!(col1.is_null(4));
1479 assert!(col1.is_null(5));
1480 assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1481
1482 let col2 = batches[0].column(1).as_primitive::<T>();
1483 assert_eq!(col2.null_count(), 1);
1484 assert!(col2.is_null(5));
1485 assert_eq!(
1486 col2.values(),
1487 &[
1488 34016123000000 / unit_in_nanos,
1489 86399000000000 / unit_in_nanos,
1490 64800000000000 / unit_in_nanos,
1491 40,
1492 1234,
1493 0
1494 ]
1495 .map(T::Native::usize_as)
1496 );
1497
1498 let col3 = batches[0].column(2).as_primitive::<T>();
1499 assert_eq!(col3.null_count(), 0);
1500 assert_eq!(
1501 col3.values(),
1502 &[
1503 38,
1504 123,
1505 34016123000000 / unit_in_nanos,
1506 49349190855000 / unit_in_nanos,
1507 34016123000000 / unit_in_nanos,
1508 52016123000000 / unit_in_nanos
1509 ]
1510 .map(T::Native::usize_as)
1511 );
1512 }
1513
1514 #[test]
1515 fn test_times() {
1516 test_time::<Time32MillisecondType>();
1517 test_time::<Time32SecondType>();
1518 test_time::<Time64MicrosecondType>();
1519 test_time::<Time64NanosecondType>();
1520 }
1521
1522 fn test_duration<T: ArrowTemporalType>() {
1523 let buf = r#"
1524 {"a": 1, "b": "2"}
1525 {"a": 3, "b": null}
1526 "#;
1527
1528 let schema = Arc::new(Schema::new(vec![
1529 Field::new("a", T::DATA_TYPE, true),
1530 Field::new("b", T::DATA_TYPE, true),
1531 ]));
1532
1533 let batches = do_read(buf, 1024, true, false, schema);
1534 assert_eq!(batches.len(), 1);
1535
1536 let col_a = batches[0].column_by_name("a").unwrap().as_primitive::<T>();
1537 assert_eq!(col_a.null_count(), 0);
1538 assert_eq!(col_a.values(), &[1, 3].map(T::Native::usize_as));
1539
1540 let col2 = batches[0].column_by_name("b").unwrap().as_primitive::<T>();
1541 assert_eq!(col2.null_count(), 1);
1542 assert_eq!(col2.values(), &[2, 0].map(T::Native::usize_as));
1543 }
1544
1545 #[test]
1546 fn test_durations() {
1547 test_duration::<DurationNanosecondType>();
1548 test_duration::<DurationMicrosecondType>();
1549 test_duration::<DurationMillisecondType>();
1550 test_duration::<DurationSecondType>();
1551 }
1552
1553 #[test]
1554 fn test_delta_checkpoint() {
1555 let json = "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}";
1556 let schema = Arc::new(Schema::new(vec![
1557 Field::new_struct(
1558 "protocol",
1559 vec![
1560 Field::new("minReaderVersion", DataType::Int32, true),
1561 Field::new("minWriterVersion", DataType::Int32, true),
1562 ],
1563 true,
1564 ),
1565 Field::new_struct(
1566 "add",
1567 vec![Field::new_map(
1568 "partitionValues",
1569 "key_value",
1570 Field::new("key", DataType::Utf8, false),
1571 Field::new("value", DataType::Utf8, true),
1572 false,
1573 false,
1574 )],
1575 true,
1576 ),
1577 ]));
1578
1579 let batches = do_read(json, 1024, true, false, schema);
1580 assert_eq!(batches.len(), 1);
1581
1582 let s: StructArray = batches.into_iter().next().unwrap().into();
1583 let opts = FormatOptions::default().with_null("null");
1584 let formatter = ArrayFormatter::try_new(&s, &opts).unwrap();
1585 assert_eq!(
1586 formatter.value(0).to_string(),
1587 "{protocol: {minReaderVersion: 1, minWriterVersion: 2}, add: null}"
1588 );
1589 }
1590
1591 #[test]
1592 fn struct_nullability() {
1593 let do_test = |child: DataType| {
1594 let non_null = r#"{"foo": {}}"#;
1596 let schema = Arc::new(Schema::new(vec![Field::new_struct(
1597 "foo",
1598 vec![Field::new("bar", child, false)],
1599 true,
1600 )]));
1601 let mut reader = ReaderBuilder::new(schema.clone())
1602 .build(Cursor::new(non_null.as_bytes()))
1603 .unwrap();
1604 assert!(reader.next().unwrap().is_err()); let null = r#"{"foo": {bar: null}}"#;
1607 let mut reader = ReaderBuilder::new(schema.clone())
1608 .build(Cursor::new(null.as_bytes()))
1609 .unwrap();
1610 assert!(reader.next().unwrap().is_err()); let null = r#"{"foo": null}"#;
1614 let mut reader = ReaderBuilder::new(schema)
1615 .build(Cursor::new(null.as_bytes()))
1616 .unwrap();
1617 let batch = reader.next().unwrap().unwrap();
1618 assert_eq!(batch.num_columns(), 1);
1619 let foo = batch.column(0).as_struct();
1620 assert_eq!(foo.len(), 1);
1621 assert!(foo.is_null(0));
1622 assert_eq!(foo.num_columns(), 1);
1623
1624 let bar = foo.column(0);
1625 assert_eq!(bar.len(), 1);
1626 assert!(bar.is_null(0));
1628 };
1629
1630 do_test(DataType::Boolean);
1631 do_test(DataType::Int32);
1632 do_test(DataType::Utf8);
1633 do_test(DataType::Decimal128(2, 1));
1634 do_test(DataType::Timestamp(
1635 TimeUnit::Microsecond,
1636 Some("+00:00".into()),
1637 ));
1638 }
1639
1640 #[test]
1641 fn test_truncation() {
1642 let buf = r#"
1643 {"i64": 9223372036854775807, "u64": 18446744073709551615 }
1644 {"i64": "9223372036854775807", "u64": "18446744073709551615" }
1645 {"i64": -9223372036854775808, "u64": 0 }
1646 {"i64": "-9223372036854775808", "u64": 0 }
1647 "#;
1648
1649 let schema = Arc::new(Schema::new(vec![
1650 Field::new("i64", DataType::Int64, true),
1651 Field::new("u64", DataType::UInt64, true),
1652 ]));
1653
1654 let batches = do_read(buf, 1024, true, false, schema);
1655 assert_eq!(batches.len(), 1);
1656
1657 let i64 = batches[0].column(0).as_primitive::<Int64Type>();
1658 assert_eq!(i64.values(), &[i64::MAX, i64::MAX, i64::MIN, i64::MIN]);
1659
1660 let u64 = batches[0].column(1).as_primitive::<UInt64Type>();
1661 assert_eq!(u64.values(), &[u64::MAX, u64::MAX, u64::MIN, u64::MIN]);
1662 }
1663
1664 #[test]
1665 fn test_timestamp_truncation() {
1666 let buf = r#"
1667 {"time": 9223372036854775807 }
1668 {"time": -9223372036854775808 }
1669 {"time": 9e5 }
1670 "#;
1671
1672 let schema = Arc::new(Schema::new(vec![Field::new(
1673 "time",
1674 DataType::Timestamp(TimeUnit::Nanosecond, None),
1675 true,
1676 )]));
1677
1678 let batches = do_read(buf, 1024, true, false, schema);
1679 assert_eq!(batches.len(), 1);
1680
1681 let i64 = batches[0]
1682 .column(0)
1683 .as_primitive::<TimestampNanosecondType>();
1684 assert_eq!(i64.values(), &[i64::MAX, i64::MIN, 900000]);
1685 }
1686
1687 #[test]
1688 fn test_strict_mode_no_missing_columns_in_schema() {
1689 let buf = r#"
1690 {"a": 1, "b": "2", "c": true}
1691 {"a": 2E0, "b": "4", "c": false}
1692 "#;
1693
1694 let schema = Arc::new(Schema::new(vec![
1695 Field::new("a", DataType::Int16, false),
1696 Field::new("b", DataType::Utf8, false),
1697 Field::new("c", DataType::Boolean, false),
1698 ]));
1699
1700 let batches = do_read(buf, 1024, true, true, schema);
1701 assert_eq!(batches.len(), 1);
1702
1703 let buf = r#"
1704 {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1705 {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1706 "#;
1707
1708 let schema = Arc::new(Schema::new(vec![
1709 Field::new("a", DataType::Int16, false),
1710 Field::new("b", DataType::Utf8, false),
1711 Field::new_struct(
1712 "c",
1713 vec![
1714 Field::new("a", DataType::Boolean, false),
1715 Field::new("b", DataType::Int16, false),
1716 ],
1717 false,
1718 ),
1719 ]));
1720
1721 let batches = do_read(buf, 1024, true, true, schema);
1722 assert_eq!(batches.len(), 1);
1723 }
1724
1725 #[test]
1726 fn test_strict_mode_missing_columns_in_schema() {
1727 let buf = r#"
1728 {"a": 1, "b": "2", "c": true}
1729 {"a": 2E0, "b": "4", "c": false}
1730 "#;
1731
1732 let schema = Arc::new(Schema::new(vec![
1733 Field::new("a", DataType::Int16, true),
1734 Field::new("c", DataType::Boolean, true),
1735 ]));
1736
1737 let err = ReaderBuilder::new(schema)
1738 .with_batch_size(1024)
1739 .with_strict_mode(true)
1740 .build(Cursor::new(buf.as_bytes()))
1741 .unwrap()
1742 .read()
1743 .unwrap_err();
1744
1745 assert_eq!(
1746 err.to_string(),
1747 "Json error: column 'b' missing from schema"
1748 );
1749
1750 let buf = r#"
1751 {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1752 {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1753 "#;
1754
1755 let schema = Arc::new(Schema::new(vec![
1756 Field::new("a", DataType::Int16, false),
1757 Field::new("b", DataType::Utf8, false),
1758 Field::new_struct("c", vec![Field::new("a", DataType::Boolean, false)], false),
1759 ]));
1760
1761 let err = ReaderBuilder::new(schema)
1762 .with_batch_size(1024)
1763 .with_strict_mode(true)
1764 .build(Cursor::new(buf.as_bytes()))
1765 .unwrap()
1766 .read()
1767 .unwrap_err();
1768
1769 assert_eq!(
1770 err.to_string(),
1771 "Json error: whilst decoding field 'c': column 'b' missing from schema"
1772 );
1773 }
1774
1775 fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
1776 let file = File::open(path).unwrap();
1777 let mut reader = BufReader::new(file);
1778 let schema = schema.unwrap_or_else(|| {
1779 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1780 reader.rewind().unwrap();
1781 schema
1782 });
1783 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1784 builder.build(reader).unwrap()
1785 }
1786
1787 #[test]
1788 fn test_json_basic() {
1789 let mut reader = read_file("test/data/basic.json", None);
1790 let batch = reader.next().unwrap().unwrap();
1791
1792 assert_eq!(8, batch.num_columns());
1793 assert_eq!(12, batch.num_rows());
1794
1795 let schema = reader.schema();
1796 let batch_schema = batch.schema();
1797 assert_eq!(schema, batch_schema);
1798
1799 let a = schema.column_with_name("a").unwrap();
1800 assert_eq!(0, a.0);
1801 assert_eq!(&DataType::Int64, a.1.data_type());
1802 let b = schema.column_with_name("b").unwrap();
1803 assert_eq!(1, b.0);
1804 assert_eq!(&DataType::Float64, b.1.data_type());
1805 let c = schema.column_with_name("c").unwrap();
1806 assert_eq!(2, c.0);
1807 assert_eq!(&DataType::Boolean, c.1.data_type());
1808 let d = schema.column_with_name("d").unwrap();
1809 assert_eq!(3, d.0);
1810 assert_eq!(&DataType::Utf8, d.1.data_type());
1811
1812 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1813 assert_eq!(1, aa.value(0));
1814 assert_eq!(-10, aa.value(1));
1815 let bb = batch.column(b.0).as_primitive::<Float64Type>();
1816 assert_eq!(2.0, bb.value(0));
1817 assert_eq!(-3.5, bb.value(1));
1818 let cc = batch.column(c.0).as_boolean();
1819 assert!(!cc.value(0));
1820 assert!(cc.value(10));
1821 let dd = batch.column(d.0).as_string::<i32>();
1822 assert_eq!("4", dd.value(0));
1823 assert_eq!("text", dd.value(8));
1824 }
1825
1826 #[test]
1827 fn test_json_empty_projection() {
1828 let mut reader = read_file("test/data/basic.json", Some(Schema::empty()));
1829 let batch = reader.next().unwrap().unwrap();
1830
1831 assert_eq!(0, batch.num_columns());
1832 assert_eq!(12, batch.num_rows());
1833 }
1834
1835 #[test]
1836 fn test_json_basic_with_nulls() {
1837 let mut reader = read_file("test/data/basic_nulls.json", None);
1838 let batch = reader.next().unwrap().unwrap();
1839
1840 assert_eq!(4, batch.num_columns());
1841 assert_eq!(12, batch.num_rows());
1842
1843 let schema = reader.schema();
1844 let batch_schema = batch.schema();
1845 assert_eq!(schema, batch_schema);
1846
1847 let a = schema.column_with_name("a").unwrap();
1848 assert_eq!(&DataType::Int64, a.1.data_type());
1849 let b = schema.column_with_name("b").unwrap();
1850 assert_eq!(&DataType::Float64, b.1.data_type());
1851 let c = schema.column_with_name("c").unwrap();
1852 assert_eq!(&DataType::Boolean, c.1.data_type());
1853 let d = schema.column_with_name("d").unwrap();
1854 assert_eq!(&DataType::Utf8, d.1.data_type());
1855
1856 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1857 assert!(aa.is_valid(0));
1858 assert!(!aa.is_valid(1));
1859 assert!(!aa.is_valid(11));
1860 let bb = batch.column(b.0).as_primitive::<Float64Type>();
1861 assert!(bb.is_valid(0));
1862 assert!(!bb.is_valid(2));
1863 assert!(!bb.is_valid(11));
1864 let cc = batch.column(c.0).as_boolean();
1865 assert!(cc.is_valid(0));
1866 assert!(!cc.is_valid(4));
1867 assert!(!cc.is_valid(11));
1868 let dd = batch.column(d.0).as_string::<i32>();
1869 assert!(!dd.is_valid(0));
1870 assert!(dd.is_valid(1));
1871 assert!(!dd.is_valid(4));
1872 assert!(!dd.is_valid(11));
1873 }
1874
1875 #[test]
1876 fn test_json_basic_schema() {
1877 let schema = Schema::new(vec![
1878 Field::new("a", DataType::Int64, true),
1879 Field::new("b", DataType::Float32, false),
1880 Field::new("c", DataType::Boolean, false),
1881 Field::new("d", DataType::Utf8, false),
1882 ]);
1883
1884 let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1885 let reader_schema = reader.schema();
1886 assert_eq!(reader_schema.as_ref(), &schema);
1887 let batch = reader.next().unwrap().unwrap();
1888
1889 assert_eq!(4, batch.num_columns());
1890 assert_eq!(12, batch.num_rows());
1891
1892 let schema = batch.schema();
1893
1894 let a = schema.column_with_name("a").unwrap();
1895 assert_eq!(&DataType::Int64, a.1.data_type());
1896 let b = schema.column_with_name("b").unwrap();
1897 assert_eq!(&DataType::Float32, b.1.data_type());
1898 let c = schema.column_with_name("c").unwrap();
1899 assert_eq!(&DataType::Boolean, c.1.data_type());
1900 let d = schema.column_with_name("d").unwrap();
1901 assert_eq!(&DataType::Utf8, d.1.data_type());
1902
1903 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1904 assert_eq!(1, aa.value(0));
1905 assert_eq!(100000000000000, aa.value(11));
1906 let bb = batch.column(b.0).as_primitive::<Float32Type>();
1907 assert_eq!(2.0, bb.value(0));
1908 assert_eq!(-3.5, bb.value(1));
1909 }
1910
1911 #[test]
1912 fn test_json_basic_schema_projection() {
1913 let schema = Schema::new(vec![
1914 Field::new("a", DataType::Int64, true),
1915 Field::new("c", DataType::Boolean, false),
1916 ]);
1917
1918 let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1919 let batch = reader.next().unwrap().unwrap();
1920
1921 assert_eq!(2, batch.num_columns());
1922 assert_eq!(2, batch.schema().fields().len());
1923 assert_eq!(12, batch.num_rows());
1924
1925 assert_eq!(batch.schema().as_ref(), &schema);
1926
1927 let a = schema.column_with_name("a").unwrap();
1928 assert_eq!(0, a.0);
1929 assert_eq!(&DataType::Int64, a.1.data_type());
1930 let c = schema.column_with_name("c").unwrap();
1931 assert_eq!(1, c.0);
1932 assert_eq!(&DataType::Boolean, c.1.data_type());
1933 }
1934
1935 #[test]
1936 fn test_json_arrays() {
1937 let mut reader = read_file("test/data/arrays.json", None);
1938 let batch = reader.next().unwrap().unwrap();
1939
1940 assert_eq!(4, batch.num_columns());
1941 assert_eq!(3, batch.num_rows());
1942
1943 let schema = batch.schema();
1944
1945 let a = schema.column_with_name("a").unwrap();
1946 assert_eq!(&DataType::Int64, a.1.data_type());
1947 let b = schema.column_with_name("b").unwrap();
1948 assert_eq!(
1949 &DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))),
1950 b.1.data_type()
1951 );
1952 let c = schema.column_with_name("c").unwrap();
1953 assert_eq!(
1954 &DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
1955 c.1.data_type()
1956 );
1957 let d = schema.column_with_name("d").unwrap();
1958 assert_eq!(&DataType::Utf8, d.1.data_type());
1959
1960 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1961 assert_eq!(1, aa.value(0));
1962 assert_eq!(-10, aa.value(1));
1963 assert_eq!(1627668684594000000, aa.value(2));
1964 let bb = batch.column(b.0).as_list::<i32>();
1965 let bb = bb.values().as_primitive::<Float64Type>();
1966 assert_eq!(9, bb.len());
1967 assert_eq!(2.0, bb.value(0));
1968 assert_eq!(-6.1, bb.value(5));
1969 assert!(!bb.is_valid(7));
1970
1971 let cc = batch
1972 .column(c.0)
1973 .as_any()
1974 .downcast_ref::<ListArray>()
1975 .unwrap();
1976 let cc = cc.values().as_boolean();
1977 assert_eq!(6, cc.len());
1978 assert!(!cc.value(0));
1979 assert!(!cc.value(4));
1980 assert!(!cc.is_valid(5));
1981 }
1982
1983 #[test]
1984 fn test_empty_json_arrays() {
1985 let json_content = r#"
1986 {"items": []}
1987 {"items": null}
1988 {}
1989 "#;
1990
1991 let schema = Arc::new(Schema::new(vec![Field::new(
1992 "items",
1993 DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
1994 true,
1995 )]));
1996
1997 let batches = do_read(json_content, 1024, false, false, schema);
1998 assert_eq!(batches.len(), 1);
1999
2000 let col1 = batches[0].column(0).as_list::<i32>();
2001 assert_eq!(col1.null_count(), 2);
2002 assert!(col1.value(0).is_empty());
2003 assert_eq!(col1.value(0).data_type(), &DataType::Null);
2004 assert!(col1.is_null(1));
2005 assert!(col1.is_null(2));
2006 }
2007
2008 #[test]
2009 fn test_nested_empty_json_arrays() {
2010 let json_content = r#"
2011 {"items": [[],[]]}
2012 {"items": [[null, null],[null]]}
2013 "#;
2014
2015 let schema = Arc::new(Schema::new(vec![Field::new(
2016 "items",
2017 DataType::List(FieldRef::new(Field::new_list_field(
2018 DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2019 true,
2020 ))),
2021 true,
2022 )]));
2023
2024 let batches = do_read(json_content, 1024, false, false, schema);
2025 assert_eq!(batches.len(), 1);
2026
2027 let col1 = batches[0].column(0).as_list::<i32>();
2028 assert_eq!(col1.null_count(), 0);
2029 assert_eq!(col1.value(0).len(), 2);
2030 assert!(col1.value(0).as_list::<i32>().value(0).is_empty());
2031 assert!(col1.value(0).as_list::<i32>().value(1).is_empty());
2032
2033 assert_eq!(col1.value(1).len(), 2);
2034 assert_eq!(col1.value(1).as_list::<i32>().value(0).len(), 2);
2035 assert_eq!(col1.value(1).as_list::<i32>().value(1).len(), 1);
2036 }
2037
2038 #[test]
2039 fn test_nested_list_json_arrays() {
2040 let c_field = Field::new_struct("c", vec![Field::new("d", DataType::Utf8, true)], true);
2041 let a_struct_field = Field::new_struct(
2042 "a",
2043 vec![Field::new("b", DataType::Boolean, true), c_field.clone()],
2044 true,
2045 );
2046 let a_field = Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
2047 let schema = Arc::new(Schema::new(vec![a_field.clone()]));
2048 let builder = ReaderBuilder::new(schema).with_batch_size(64);
2049 let json_content = r#"
2050 {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
2051 {"a": [{"b": false, "c": null}]}
2052 {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
2053 {"a": null}
2054 {"a": []}
2055 {"a": [null]}
2056 "#;
2057 let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2058
2059 let d = StringArray::from(vec![
2061 Some("a_text"),
2062 Some("b_text"),
2063 None,
2064 Some("c_text"),
2065 Some("d_text"),
2066 None,
2067 None,
2068 ]);
2069 let c = ArrayDataBuilder::new(c_field.data_type().clone())
2070 .len(7)
2071 .add_child_data(d.to_data())
2072 .null_bit_buffer(Some(Buffer::from([0b00111011])))
2073 .build()
2074 .unwrap();
2075 let b = BooleanArray::from(vec![
2076 Some(true),
2077 Some(false),
2078 Some(false),
2079 Some(true),
2080 None,
2081 Some(true),
2082 None,
2083 ]);
2084 let a = ArrayDataBuilder::new(a_struct_field.data_type().clone())
2085 .len(7)
2086 .add_child_data(b.to_data())
2087 .add_child_data(c.clone())
2088 .null_bit_buffer(Some(Buffer::from([0b00111111])))
2089 .build()
2090 .unwrap();
2091 let a_list = ArrayDataBuilder::new(a_field.data_type().clone())
2092 .len(6)
2093 .add_buffer(Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7]))
2094 .add_child_data(a)
2095 .null_bit_buffer(Some(Buffer::from([0b00110111])))
2096 .build()
2097 .unwrap();
2098 let expected = make_array(a_list);
2099
2100 let batch = reader.next().unwrap().unwrap();
2102 let read = batch.column(0);
2103 assert_eq!(read.len(), 6);
2104 let read: &ListArray = read.as_list::<i32>();
2106 let expected = expected.as_list::<i32>();
2107 assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
2108 assert_eq!(read.nulls(), expected.nulls());
2110 let struct_array = read.values().as_struct();
2112 let expected_struct_array = expected.values().as_struct();
2113
2114 assert_eq!(7, struct_array.len());
2115 assert_eq!(1, struct_array.null_count());
2116 assert_eq!(7, expected_struct_array.len());
2117 assert_eq!(1, expected_struct_array.null_count());
2118 assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
2120 let read_b = struct_array.column(0);
2122 assert_eq!(read_b.as_ref(), &b);
2123 let read_c = struct_array.column(1);
2124 assert_eq!(read_c.to_data(), c);
2125 let read_c = read_c.as_struct();
2126 let read_d = read_c.column(0);
2127 assert_eq!(read_d.as_ref(), &d);
2128
2129 assert_eq!(read, expected);
2130 }
2131
2132 #[test]
2133 fn test_skip_empty_lines() {
2134 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2135 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
2136 let json_content = "
2137 {\"a\": 1}
2138 {\"a\": 2}
2139 {\"a\": 3}";
2140 let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2141 let batch = reader.next().unwrap().unwrap();
2142
2143 assert_eq!(1, batch.num_columns());
2144 assert_eq!(3, batch.num_rows());
2145
2146 let schema = reader.schema();
2147 let c = schema.column_with_name("a").unwrap();
2148 assert_eq!(&DataType::Int64, c.1.data_type());
2149 }
2150
2151 #[test]
2152 fn test_with_multiple_batches() {
2153 let file = File::open("test/data/basic_nulls.json").unwrap();
2154 let mut reader = BufReader::new(file);
2155 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2156 reader.rewind().unwrap();
2157
2158 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2159 let mut reader = builder.build(reader).unwrap();
2160
2161 let mut num_records = Vec::new();
2162 while let Some(rb) = reader.next().transpose().unwrap() {
2163 num_records.push(rb.num_rows());
2164 }
2165
2166 assert_eq!(vec![5, 5, 2], num_records);
2167 }
2168
2169 #[test]
2170 fn test_timestamp_from_json_seconds() {
2171 let schema = Schema::new(vec![Field::new(
2172 "a",
2173 DataType::Timestamp(TimeUnit::Second, None),
2174 true,
2175 )]);
2176
2177 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2178 let batch = reader.next().unwrap().unwrap();
2179
2180 assert_eq!(1, batch.num_columns());
2181 assert_eq!(12, batch.num_rows());
2182
2183 let schema = reader.schema();
2184 let batch_schema = batch.schema();
2185 assert_eq!(schema, batch_schema);
2186
2187 let a = schema.column_with_name("a").unwrap();
2188 assert_eq!(
2189 &DataType::Timestamp(TimeUnit::Second, None),
2190 a.1.data_type()
2191 );
2192
2193 let aa = batch.column(a.0).as_primitive::<TimestampSecondType>();
2194 assert!(aa.is_valid(0));
2195 assert!(!aa.is_valid(1));
2196 assert!(!aa.is_valid(2));
2197 assert_eq!(1, aa.value(0));
2198 assert_eq!(1, aa.value(3));
2199 assert_eq!(5, aa.value(7));
2200 }
2201
2202 #[test]
2203 fn test_timestamp_from_json_milliseconds() {
2204 let schema = Schema::new(vec![Field::new(
2205 "a",
2206 DataType::Timestamp(TimeUnit::Millisecond, None),
2207 true,
2208 )]);
2209
2210 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2211 let batch = reader.next().unwrap().unwrap();
2212
2213 assert_eq!(1, batch.num_columns());
2214 assert_eq!(12, batch.num_rows());
2215
2216 let schema = reader.schema();
2217 let batch_schema = batch.schema();
2218 assert_eq!(schema, batch_schema);
2219
2220 let a = schema.column_with_name("a").unwrap();
2221 assert_eq!(
2222 &DataType::Timestamp(TimeUnit::Millisecond, None),
2223 a.1.data_type()
2224 );
2225
2226 let aa = batch.column(a.0).as_primitive::<TimestampMillisecondType>();
2227 assert!(aa.is_valid(0));
2228 assert!(!aa.is_valid(1));
2229 assert!(!aa.is_valid(2));
2230 assert_eq!(1, aa.value(0));
2231 assert_eq!(1, aa.value(3));
2232 assert_eq!(5, aa.value(7));
2233 }
2234
2235 #[test]
2236 fn test_date_from_json_milliseconds() {
2237 let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
2238
2239 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2240 let batch = reader.next().unwrap().unwrap();
2241
2242 assert_eq!(1, batch.num_columns());
2243 assert_eq!(12, batch.num_rows());
2244
2245 let schema = reader.schema();
2246 let batch_schema = batch.schema();
2247 assert_eq!(schema, batch_schema);
2248
2249 let a = schema.column_with_name("a").unwrap();
2250 assert_eq!(&DataType::Date64, a.1.data_type());
2251
2252 let aa = batch.column(a.0).as_primitive::<Date64Type>();
2253 assert!(aa.is_valid(0));
2254 assert!(!aa.is_valid(1));
2255 assert!(!aa.is_valid(2));
2256 assert_eq!(1, aa.value(0));
2257 assert_eq!(1, aa.value(3));
2258 assert_eq!(5, aa.value(7));
2259 }
2260
2261 #[test]
2262 fn test_time_from_json_nanoseconds() {
2263 let schema = Schema::new(vec![Field::new(
2264 "a",
2265 DataType::Time64(TimeUnit::Nanosecond),
2266 true,
2267 )]);
2268
2269 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2270 let batch = reader.next().unwrap().unwrap();
2271
2272 assert_eq!(1, batch.num_columns());
2273 assert_eq!(12, batch.num_rows());
2274
2275 let schema = reader.schema();
2276 let batch_schema = batch.schema();
2277 assert_eq!(schema, batch_schema);
2278
2279 let a = schema.column_with_name("a").unwrap();
2280 assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
2281
2282 let aa = batch.column(a.0).as_primitive::<Time64NanosecondType>();
2283 assert!(aa.is_valid(0));
2284 assert!(!aa.is_valid(1));
2285 assert!(!aa.is_valid(2));
2286 assert_eq!(1, aa.value(0));
2287 assert_eq!(1, aa.value(3));
2288 assert_eq!(5, aa.value(7));
2289 }
2290
2291 #[test]
2292 fn test_json_iterator() {
2293 let file = File::open("test/data/basic.json").unwrap();
2294 let mut reader = BufReader::new(file);
2295 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2296 reader.rewind().unwrap();
2297
2298 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2299 let reader = builder.build(reader).unwrap();
2300 let schema = reader.schema();
2301 let (col_a_index, _) = schema.column_with_name("a").unwrap();
2302
2303 let mut sum_num_rows = 0;
2304 let mut num_batches = 0;
2305 let mut sum_a = 0;
2306 for batch in reader {
2307 let batch = batch.unwrap();
2308 assert_eq!(8, batch.num_columns());
2309 sum_num_rows += batch.num_rows();
2310 num_batches += 1;
2311 let batch_schema = batch.schema();
2312 assert_eq!(schema, batch_schema);
2313 let a_array = batch.column(col_a_index).as_primitive::<Int64Type>();
2314 sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
2315 }
2316 assert_eq!(12, sum_num_rows);
2317 assert_eq!(3, num_batches);
2318 assert_eq!(100000000000011, sum_a);
2319 }
2320
2321 #[test]
2322 fn test_decoder_error() {
2323 let schema = Arc::new(Schema::new(vec![Field::new_struct(
2324 "a",
2325 vec![Field::new("child", DataType::Int32, false)],
2326 true,
2327 )]));
2328
2329 let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
2330 let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
2331 assert!(decoder.tape_decoder.has_partial_row());
2332 assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2333 let _ = decoder.flush().unwrap_err();
2334 assert!(decoder.tape_decoder.has_partial_row());
2335 assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2336
2337 let parse_err = |s: &str| {
2338 ReaderBuilder::new(schema.clone())
2339 .build(Cursor::new(s.as_bytes()))
2340 .unwrap()
2341 .next()
2342 .unwrap()
2343 .unwrap_err()
2344 .to_string()
2345 };
2346
2347 let err = parse_err(r#"{"a": 123}"#);
2348 assert_eq!(
2349 err,
2350 "Json error: whilst decoding field 'a': expected { got 123"
2351 );
2352
2353 let err = parse_err(r#"{"a": ["bar"]}"#);
2354 assert_eq!(
2355 err,
2356 r#"Json error: whilst decoding field 'a': expected { got ["bar"]"#
2357 );
2358
2359 let err = parse_err(r#"{"a": []}"#);
2360 assert_eq!(
2361 err,
2362 "Json error: whilst decoding field 'a': expected { got []"
2363 );
2364
2365 let err = parse_err(r#"{"a": [{"child": 234}]}"#);
2366 assert_eq!(
2367 err,
2368 r#"Json error: whilst decoding field 'a': expected { got [{"child": 234}]"#
2369 );
2370
2371 let err = parse_err(r#"{"a": [{"child": {"foo": [{"foo": ["bar"]}]}}]}"#);
2372 assert_eq!(
2373 err,
2374 r#"Json error: whilst decoding field 'a': expected { got [{"child": {"foo": [{"foo": ["bar"]}]}}]"#
2375 );
2376
2377 let err = parse_err(r#"{"a": true}"#);
2378 assert_eq!(
2379 err,
2380 "Json error: whilst decoding field 'a': expected { got true"
2381 );
2382
2383 let err = parse_err(r#"{"a": false}"#);
2384 assert_eq!(
2385 err,
2386 "Json error: whilst decoding field 'a': expected { got false"
2387 );
2388
2389 let err = parse_err(r#"{"a": "foo"}"#);
2390 assert_eq!(
2391 err,
2392 "Json error: whilst decoding field 'a': expected { got \"foo\""
2393 );
2394
2395 let err = parse_err(r#"{"a": {"child": false}}"#);
2396 assert_eq!(
2397 err,
2398 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got false"
2399 );
2400
2401 let err = parse_err(r#"{"a": {"child": []}}"#);
2402 assert_eq!(
2403 err,
2404 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got []"
2405 );
2406
2407 let err = parse_err(r#"{"a": {"child": [123]}}"#);
2408 assert_eq!(
2409 err,
2410 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123]"
2411 );
2412
2413 let err = parse_err(r#"{"a": {"child": [123, 3465346]}}"#);
2414 assert_eq!(
2415 err,
2416 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123, 3465346]"
2417 );
2418 }
2419
2420 #[test]
2421 fn test_serialize_timestamp() {
2422 let json = vec![
2423 json!({"timestamp": 1681319393}),
2424 json!({"timestamp": "1970-01-01T00:00:00+02:00"}),
2425 ];
2426 let schema = Schema::new(vec![Field::new(
2427 "timestamp",
2428 DataType::Timestamp(TimeUnit::Second, None),
2429 true,
2430 )]);
2431 let mut decoder = ReaderBuilder::new(Arc::new(schema))
2432 .build_decoder()
2433 .unwrap();
2434 decoder.serialize(&json).unwrap();
2435 let batch = decoder.flush().unwrap().unwrap();
2436 assert_eq!(batch.num_rows(), 2);
2437 assert_eq!(batch.num_columns(), 1);
2438 let values = batch.column(0).as_primitive::<TimestampSecondType>();
2439 assert_eq!(values.values(), &[1681319393, -7200]);
2440 }
2441
2442 #[test]
2443 fn test_serialize_decimal() {
2444 let json = vec![
2445 json!({"decimal": 1.234}),
2446 json!({"decimal": "1.234"}),
2447 json!({"decimal": 1234}),
2448 json!({"decimal": "1234"}),
2449 ];
2450 let schema = Schema::new(vec![Field::new(
2451 "decimal",
2452 DataType::Decimal128(10, 3),
2453 true,
2454 )]);
2455 let mut decoder = ReaderBuilder::new(Arc::new(schema))
2456 .build_decoder()
2457 .unwrap();
2458 decoder.serialize(&json).unwrap();
2459 let batch = decoder.flush().unwrap().unwrap();
2460 assert_eq!(batch.num_rows(), 4);
2461 assert_eq!(batch.num_columns(), 1);
2462 let values = batch.column(0).as_primitive::<Decimal128Type>();
2463 assert_eq!(values.values(), &[1234, 1234, 1234000, 1234000]);
2464 }
2465
2466 #[test]
2467 fn test_serde_field() {
2468 let field = Field::new("int", DataType::Int32, true);
2469 let mut decoder = ReaderBuilder::new_with_field(field)
2470 .build_decoder()
2471 .unwrap();
2472 decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
2473 let b = decoder.flush().unwrap().unwrap();
2474 let values = b.column(0).as_primitive::<Int32Type>().values();
2475 assert_eq!(values, &[1, 2, 3, 4]);
2476 }
2477
2478 #[test]
2479 fn test_serde_large_numbers() {
2480 let field = Field::new("int", DataType::Int64, true);
2481 let mut decoder = ReaderBuilder::new_with_field(field)
2482 .build_decoder()
2483 .unwrap();
2484
2485 decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2486 let b = decoder.flush().unwrap().unwrap();
2487 let values = b.column(0).as_primitive::<Int64Type>().values();
2488 assert_eq!(values, &[1699148028689, 2, 3, 4]);
2489
2490 let field = Field::new(
2491 "int",
2492 DataType::Timestamp(TimeUnit::Microsecond, None),
2493 true,
2494 );
2495 let mut decoder = ReaderBuilder::new_with_field(field)
2496 .build_decoder()
2497 .unwrap();
2498
2499 decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2500 let b = decoder.flush().unwrap().unwrap();
2501 let values = b
2502 .column(0)
2503 .as_primitive::<TimestampMicrosecondType>()
2504 .values();
2505 assert_eq!(values, &[1699148028689, 2, 3, 4]);
2506 }
2507
2508 #[test]
2509 fn test_coercing_primitive_into_string_decoder() {
2510 let buf = &format!(
2511 r#"[{{"a": 1, "b": "A", "c": "T"}}, {{"a": 2, "b": "BB", "c": "F"}}, {{"a": {}, "b": 123, "c": false}}, {{"a": {}, "b": 789, "c": true}}]"#,
2512 (i32::MAX as i64 + 10),
2513 i64::MAX - 10
2514 );
2515 let schema = Schema::new(vec![
2516 Field::new("a", DataType::Float64, true),
2517 Field::new("b", DataType::Utf8, true),
2518 Field::new("c", DataType::Utf8, true),
2519 ]);
2520 let json_array: Vec<serde_json::Value> = serde_json::from_str(buf).unwrap();
2521 let schema_ref = Arc::new(schema);
2522
2523 let reader = ReaderBuilder::new(schema_ref.clone()).with_coerce_primitive(true);
2525 let mut decoder = reader.build_decoder().unwrap();
2526 decoder.serialize(json_array.as_slice()).unwrap();
2527 let batch = decoder.flush().unwrap().unwrap();
2528 assert_eq!(
2529 batch,
2530 RecordBatch::try_new(
2531 schema_ref,
2532 vec![
2533 Arc::new(Float64Array::from(vec![
2534 1.0,
2535 2.0,
2536 (i32::MAX as i64 + 10) as f64,
2537 (i64::MAX - 10) as f64
2538 ])),
2539 Arc::new(StringArray::from(vec!["A", "BB", "123", "789"])),
2540 Arc::new(StringArray::from(vec!["T", "F", "false", "true"])),
2541 ]
2542 )
2543 .unwrap()
2544 );
2545 }
2546
2547 fn _parse_structs(
2552 row: &str,
2553 struct_mode: StructMode,
2554 fields: Fields,
2555 as_struct: bool,
2556 ) -> Result<RecordBatch, ArrowError> {
2557 let builder = if as_struct {
2558 ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
2559 } else {
2560 ReaderBuilder::new(Arc::new(Schema::new(fields)))
2561 };
2562 builder
2563 .with_struct_mode(struct_mode)
2564 .build(Cursor::new(row.as_bytes()))
2565 .unwrap()
2566 .next()
2567 .unwrap()
2568 }
2569
2570 #[test]
2571 fn test_struct_decoding_list_length() {
2572 use arrow_array::array;
2573
2574 let row = "[1, 2]";
2575
2576 let mut fields = vec![Field::new("a", DataType::Int32, true)];
2577 let too_few_fields = Fields::from(fields.clone());
2578 fields.push(Field::new("b", DataType::Int32, true));
2579 let correct_fields = Fields::from(fields.clone());
2580 fields.push(Field::new("c", DataType::Int32, true));
2581 let too_many_fields = Fields::from(fields.clone());
2582
2583 let parse = |fields: Fields, as_struct: bool| {
2584 _parse_structs(row, StructMode::ListOnly, fields, as_struct)
2585 };
2586
2587 let expected_row = StructArray::new(
2588 correct_fields.clone(),
2589 vec![
2590 Arc::new(array::Int32Array::from(vec![1])),
2591 Arc::new(array::Int32Array::from(vec![2])),
2592 ],
2593 None,
2594 );
2595 let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);
2596
2597 assert_eq!(
2598 parse(too_few_fields.clone(), true).unwrap_err().to_string(),
2599 "Json error: found extra columns for 1 fields".to_string()
2600 );
2601 assert_eq!(
2602 parse(too_few_fields, false).unwrap_err().to_string(),
2603 "Json error: found extra columns for 1 fields".to_string()
2604 );
2605 assert_eq!(
2606 parse(correct_fields.clone(), true).unwrap(),
2607 RecordBatch::try_new(
2608 Arc::new(Schema::new(vec![row_field])),
2609 vec![Arc::new(expected_row.clone())]
2610 )
2611 .unwrap()
2612 );
2613 assert_eq!(
2614 parse(correct_fields, false).unwrap(),
2615 RecordBatch::from(expected_row)
2616 );
2617 assert_eq!(
2618 parse(too_many_fields.clone(), true)
2619 .unwrap_err()
2620 .to_string(),
2621 "Json error: found 2 columns for 3 fields".to_string()
2622 );
2623 assert_eq!(
2624 parse(too_many_fields, false).unwrap_err().to_string(),
2625 "Json error: found 2 columns for 3 fields".to_string()
2626 );
2627 }
2628
2629 #[test]
2630 fn test_struct_decoding() {
2631 use arrow_array::builder;
2632
2633 let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
2634 let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
2635 let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;
2636
2637 let struct_fields = Fields::from(vec![
2638 Field::new("b", DataType::new_list(DataType::Int32, true), true),
2639 Field::new_map(
2640 "c",
2641 "entries",
2642 Field::new("keys", DataType::Utf8, false),
2643 Field::new("values", DataType::Int32, true),
2644 false,
2645 false,
2646 ),
2647 ]);
2648
2649 let list_array =
2650 ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);
2651
2652 let map_array = {
2653 let mut map_builder = builder::MapBuilder::new(
2654 None,
2655 builder::StringBuilder::new(),
2656 builder::Int32Builder::new(),
2657 );
2658 map_builder.keys().append_value("d");
2659 map_builder.values().append_value(3);
2660 map_builder.append(true).unwrap();
2661 map_builder.finish()
2662 };
2663
2664 let struct_array = StructArray::new(
2665 struct_fields.clone(),
2666 vec![Arc::new(list_array), Arc::new(map_array)],
2667 None,
2668 );
2669
2670 let fields = Fields::from(vec![Field::new("a", DataType::Struct(struct_fields), true)]);
2671 let schema = Arc::new(Schema::new(fields.clone()));
2672 let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();
2673
2674 let parse = |row: &str, struct_mode: StructMode| {
2675 _parse_structs(row, struct_mode, fields.clone(), false)
2676 };
2677
2678 assert_eq!(
2679 parse(nested_object_json, StructMode::ObjectOnly).unwrap(),
2680 expected
2681 );
2682 assert_eq!(
2683 parse(nested_list_json, StructMode::ObjectOnly)
2684 .unwrap_err()
2685 .to_string(),
2686 "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
2687 );
2688 assert_eq!(
2689 parse(nested_mixed_json, StructMode::ObjectOnly)
2690 .unwrap_err()
2691 .to_string(),
2692 "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
2693 );
2694
2695 assert_eq!(
2696 parse(nested_list_json, StructMode::ListOnly).unwrap(),
2697 expected
2698 );
2699 assert_eq!(
2700 parse(nested_object_json, StructMode::ListOnly)
2701 .unwrap_err()
2702 .to_string(),
2703 "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
2704 );
2705 assert_eq!(
2706 parse(nested_mixed_json, StructMode::ListOnly)
2707 .unwrap_err()
2708 .to_string(),
2709 "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
2710 );
2711 }
2712
2713 #[test]
2719 fn test_struct_decoding_empty_list() {
2720 let int_field = Field::new("a", DataType::Int32, true);
2721 let struct_field = Field::new(
2722 "r",
2723 DataType::Struct(Fields::from(vec![int_field.clone()])),
2724 true,
2725 );
2726
2727 let parse = |row: &str, as_struct: bool, field: Field| {
2728 _parse_structs(
2729 row,
2730 StructMode::ListOnly,
2731 Fields::from(vec![field]),
2732 as_struct,
2733 )
2734 };
2735
2736 assert_eq!(
2738 parse("[]", true, struct_field.clone())
2739 .unwrap_err()
2740 .to_string(),
2741 "Json error: found 0 columns for 1 fields".to_owned()
2742 );
2743 assert_eq!(
2744 parse("[]", false, int_field.clone())
2745 .unwrap_err()
2746 .to_string(),
2747 "Json error: found 0 columns for 1 fields".to_owned()
2748 );
2749 assert_eq!(
2750 parse("[]", false, struct_field.clone())
2751 .unwrap_err()
2752 .to_string(),
2753 "Json error: found 0 columns for 1 fields".to_owned()
2754 );
2755 assert_eq!(
2756 parse("[[]]", false, struct_field.clone())
2757 .unwrap_err()
2758 .to_string(),
2759 "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
2760 );
2761 }
2762
2763 #[test]
2764 fn test_decode_list_struct_with_wrong_types() {
2765 let int_field = Field::new("a", DataType::Int32, true);
2766 let struct_field = Field::new(
2767 "r",
2768 DataType::Struct(Fields::from(vec![int_field.clone()])),
2769 true,
2770 );
2771
2772 let parse = |row: &str, as_struct: bool, field: Field| {
2773 _parse_structs(
2774 row,
2775 StructMode::ListOnly,
2776 Fields::from(vec![field]),
2777 as_struct,
2778 )
2779 };
2780
2781 assert_eq!(
2783 parse(r#"[["a"]]"#, false, struct_field.clone())
2784 .unwrap_err()
2785 .to_string(),
2786 "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2787 );
2788 assert_eq!(
2789 parse(r#"[["a"]]"#, true, struct_field.clone())
2790 .unwrap_err()
2791 .to_string(),
2792 "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2793 );
2794 assert_eq!(
2795 parse(r#"["a"]"#, true, int_field.clone())
2796 .unwrap_err()
2797 .to_string(),
2798 "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2799 );
2800 assert_eq!(
2801 parse(r#"["a"]"#, false, int_field.clone())
2802 .unwrap_err()
2803 .to_string(),
2804 "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2805 );
2806 }
2807}