1use crate::StructMode;
137use crate::reader::binary_array::{
138 BinaryArrayDecoder, BinaryViewDecoder, FixedSizeBinaryArrayDecoder,
139};
140use std::io::BufRead;
141use std::sync::Arc;
142
143use chrono::Utc;
144use serde_core::Serialize;
145
146use arrow_array::timezone::Tz;
147use arrow_array::types::*;
148use arrow_array::{RecordBatch, RecordBatchReader, StructArray, downcast_integer, make_array};
149use arrow_data::ArrayData;
150use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
151pub use schema::*;
152
153use crate::reader::boolean_array::BooleanArrayDecoder;
154use crate::reader::decimal_array::DecimalArrayDecoder;
155use crate::reader::list_array::ListArrayDecoder;
156use crate::reader::map_array::MapArrayDecoder;
157use crate::reader::null_array::NullArrayDecoder;
158use crate::reader::primitive_array::PrimitiveArrayDecoder;
159use crate::reader::string_array::StringArrayDecoder;
160use crate::reader::string_view_array::StringViewArrayDecoder;
161use crate::reader::struct_array::StructArrayDecoder;
162use crate::reader::tape::{Tape, TapeDecoder};
163use crate::reader::timestamp_array::TimestampArrayDecoder;
164
165mod binary_array;
166mod boolean_array;
167mod decimal_array;
168mod list_array;
169mod map_array;
170mod null_array;
171mod primitive_array;
172mod schema;
173mod serializer;
174mod string_array;
175mod string_view_array;
176mod struct_array;
177mod tape;
178mod timestamp_array;
179
180pub struct ReaderBuilder {
182 batch_size: usize,
183 coerce_primitive: bool,
184 strict_mode: bool,
185 is_field: bool,
186 struct_mode: StructMode,
187
188 schema: SchemaRef,
189}
190
191impl ReaderBuilder {
192 pub fn new(schema: SchemaRef) -> Self {
201 Self {
202 batch_size: 1024,
203 coerce_primitive: false,
204 strict_mode: false,
205 is_field: false,
206 struct_mode: Default::default(),
207 schema,
208 }
209 }
210
211 pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
242 Self {
243 batch_size: 1024,
244 coerce_primitive: false,
245 strict_mode: false,
246 is_field: true,
247 struct_mode: Default::default(),
248 schema: Arc::new(Schema::new([field.into()])),
249 }
250 }
251
252 pub fn with_batch_size(self, batch_size: usize) -> Self {
254 Self { batch_size, ..self }
255 }
256
257 pub fn with_coerce_primitive(self, coerce_primitive: bool) -> Self {
260 Self {
261 coerce_primitive,
262 ..self
263 }
264 }
265
266 pub fn with_strict_mode(self, strict_mode: bool) -> Self {
272 Self {
273 strict_mode,
274 ..self
275 }
276 }
277
278 pub fn with_struct_mode(self, struct_mode: StructMode) -> Self {
282 Self {
283 struct_mode,
284 ..self
285 }
286 }
287
288 pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
290 Ok(Reader {
291 reader,
292 decoder: self.build_decoder()?,
293 })
294 }
295
296 pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
298 let (data_type, nullable) = match self.is_field {
299 false => (DataType::Struct(self.schema.fields.clone()), false),
300 true => {
301 let field = &self.schema.fields[0];
302 (field.data_type().clone(), field.is_nullable())
303 }
304 };
305
306 let decoder = make_decoder(
307 data_type,
308 self.coerce_primitive,
309 self.strict_mode,
310 nullable,
311 self.struct_mode,
312 )?;
313
314 let num_fields = self.schema.flattened_fields().len();
315
316 Ok(Decoder {
317 decoder,
318 is_field: self.is_field,
319 tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
320 batch_size: self.batch_size,
321 schema: self.schema,
322 })
323 }
324}
325
326pub struct Reader<R> {
330 reader: R,
331 decoder: Decoder,
332}
333
334impl<R> std::fmt::Debug for Reader<R> {
335 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
336 f.debug_struct("Reader")
337 .field("decoder", &self.decoder)
338 .finish()
339 }
340}
341
342impl<R: BufRead> Reader<R> {
343 fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
345 loop {
346 let buf = self.reader.fill_buf()?;
347 if buf.is_empty() {
348 break;
349 }
350 let read = buf.len();
351
352 let decoded = self.decoder.decode(buf)?;
353 self.reader.consume(decoded);
354 if decoded != read {
355 break;
356 }
357 }
358 self.decoder.flush()
359 }
360}
361
362impl<R: BufRead> Iterator for Reader<R> {
363 type Item = Result<RecordBatch, ArrowError>;
364
365 fn next(&mut self) -> Option<Self::Item> {
366 self.read().transpose()
367 }
368}
369
370impl<R: BufRead> RecordBatchReader for Reader<R> {
371 fn schema(&self) -> SchemaRef {
372 self.decoder.schema.clone()
373 }
374}
375
376pub struct Decoder {
417 tape_decoder: TapeDecoder,
418 decoder: Box<dyn ArrayDecoder>,
419 batch_size: usize,
420 is_field: bool,
421 schema: SchemaRef,
422}
423
424impl std::fmt::Debug for Decoder {
425 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
426 f.debug_struct("Decoder")
427 .field("schema", &self.schema)
428 .field("batch_size", &self.batch_size)
429 .finish()
430 }
431}
432
433impl Decoder {
434 pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
443 self.tape_decoder.decode(buf)
444 }
445
446 pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
623 self.tape_decoder.serialize(rows)
624 }
625
626 pub fn has_partial_record(&self) -> bool {
628 self.tape_decoder.has_partial_row()
629 }
630
631 pub fn len(&self) -> usize {
633 self.tape_decoder.num_buffered_rows()
634 }
635
636 pub fn is_empty(&self) -> bool {
638 self.len() == 0
639 }
640
641 pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
648 let tape = self.tape_decoder.finish()?;
649
650 if tape.num_rows() == 0 {
651 return Ok(None);
652 }
653
654 let mut next_object = 1;
656 let pos: Vec<_> = (0..tape.num_rows())
657 .map(|_| {
658 let next = tape.next(next_object, "row").unwrap();
659 std::mem::replace(&mut next_object, next)
660 })
661 .collect();
662
663 let decoded = self.decoder.decode(&tape, &pos)?;
664 self.tape_decoder.clear();
665
666 let batch = match self.is_field {
667 true => RecordBatch::try_new(self.schema.clone(), vec![make_array(decoded)])?,
668 false => {
669 RecordBatch::from(StructArray::from(decoded)).with_schema(self.schema.clone())?
670 }
671 };
672
673 Ok(Some(batch))
674 }
675}
676
677trait ArrayDecoder: Send {
678 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError>;
680}
681
682macro_rules! primitive_decoder {
683 ($t:ty, $data_type:expr) => {
684 Ok(Box::new(PrimitiveArrayDecoder::<$t>::new($data_type)))
685 };
686}
687
688fn make_decoder(
689 data_type: DataType,
690 coerce_primitive: bool,
691 strict_mode: bool,
692 is_nullable: bool,
693 struct_mode: StructMode,
694) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
695 downcast_integer! {
696 data_type => (primitive_decoder, data_type),
697 DataType::Null => Ok(Box::<NullArrayDecoder>::default()),
698 DataType::Float16 => primitive_decoder!(Float16Type, data_type),
699 DataType::Float32 => primitive_decoder!(Float32Type, data_type),
700 DataType::Float64 => primitive_decoder!(Float64Type, data_type),
701 DataType::Timestamp(TimeUnit::Second, None) => {
702 Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, Utc)))
703 },
704 DataType::Timestamp(TimeUnit::Millisecond, None) => {
705 Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, Utc)))
706 },
707 DataType::Timestamp(TimeUnit::Microsecond, None) => {
708 Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, Utc)))
709 },
710 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
711 Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, Utc)))
712 },
713 DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
714 let tz: Tz = tz.parse()?;
715 Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, tz)))
716 },
717 DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
718 let tz: Tz = tz.parse()?;
719 Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, tz)))
720 },
721 DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
722 let tz: Tz = tz.parse()?;
723 Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, tz)))
724 },
725 DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
726 let tz: Tz = tz.parse()?;
727 Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, tz)))
728 },
729 DataType::Date32 => primitive_decoder!(Date32Type, data_type),
730 DataType::Date64 => primitive_decoder!(Date64Type, data_type),
731 DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
732 DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type),
733 DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type),
734 DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type),
735 DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type),
736 DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type),
737 DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type),
738 DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type),
739 DataType::Decimal32(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal32Type>::new(p, s))),
740 DataType::Decimal64(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal64Type>::new(p, s))),
741 DataType::Decimal128(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal128Type>::new(p, s))),
742 DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal256Type>::new(p, s))),
743 DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
744 DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
745 DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))),
746 DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
747 DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
748 DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
749 DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
750 DataType::Binary => Ok(Box::new(BinaryArrayDecoder::<i32>::default())),
751 DataType::LargeBinary => Ok(Box::new(BinaryArrayDecoder::<i64>::default())),
752 DataType::FixedSizeBinary(len) => Ok(Box::new(FixedSizeBinaryArrayDecoder::new(len))),
753 DataType::BinaryView => Ok(Box::new(BinaryViewDecoder::default())),
754 DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
755 d => Err(ArrowError::NotYetImplemented(format!("Support for {d} in JSON reader")))
756 }
757}
758
759#[cfg(test)]
760mod tests {
761 use serde_json::json;
762 use std::fs::File;
763 use std::io::{BufReader, Cursor, Seek};
764
765 use arrow_array::cast::AsArray;
766 use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray, StringViewArray};
767 use arrow_buffer::{ArrowNativeType, Buffer};
768 use arrow_cast::display::{ArrayFormatter, FormatOptions};
769 use arrow_data::ArrayDataBuilder;
770 use arrow_schema::{Field, Fields};
771
772 use super::*;
773
774 fn do_read(
775 buf: &str,
776 batch_size: usize,
777 coerce_primitive: bool,
778 strict_mode: bool,
779 schema: SchemaRef,
780 ) -> Vec<RecordBatch> {
781 let mut unbuffered = vec![];
782
783 for batch_size in [1, 3, 100, batch_size] {
785 unbuffered = ReaderBuilder::new(schema.clone())
786 .with_batch_size(batch_size)
787 .with_coerce_primitive(coerce_primitive)
788 .build(Cursor::new(buf.as_bytes()))
789 .unwrap()
790 .collect::<Result<Vec<_>, _>>()
791 .unwrap();
792
793 for b in unbuffered.iter().take(unbuffered.len() - 1) {
794 assert_eq!(b.num_rows(), batch_size)
795 }
796
797 for b in [1, 3, 5] {
799 let buffered = ReaderBuilder::new(schema.clone())
800 .with_batch_size(batch_size)
801 .with_coerce_primitive(coerce_primitive)
802 .with_strict_mode(strict_mode)
803 .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
804 .unwrap()
805 .collect::<Result<Vec<_>, _>>()
806 .unwrap();
807 assert_eq!(unbuffered, buffered);
808 }
809 }
810
811 unbuffered
812 }
813
814 #[test]
815 fn test_basic() {
816 let buf = r#"
817 {"a": 1, "b": 2, "c": true, "d": 1}
818 {"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254}
819
820 {"b": 6, "a": 2.0, "d": 45}
821 {"b": "5", "a": 2}
822 {"b": 4e0}
823 {"b": 7, "a": null}
824 "#;
825
826 let schema = Arc::new(Schema::new(vec![
827 Field::new("a", DataType::Int64, true),
828 Field::new("b", DataType::Int32, true),
829 Field::new("c", DataType::Boolean, true),
830 Field::new("d", DataType::Date32, true),
831 Field::new("e", DataType::Date64, true),
832 ]));
833
834 let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
835 assert!(decoder.is_empty());
836 assert_eq!(decoder.len(), 0);
837 assert!(!decoder.has_partial_record());
838 assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
839 assert!(!decoder.is_empty());
840 assert_eq!(decoder.len(), 6);
841 assert!(!decoder.has_partial_record());
842 let batch = decoder.flush().unwrap().unwrap();
843 assert_eq!(batch.num_rows(), 6);
844 assert!(decoder.is_empty());
845 assert_eq!(decoder.len(), 0);
846 assert!(!decoder.has_partial_record());
847
848 let batches = do_read(buf, 1024, false, false, schema);
849 assert_eq!(batches.len(), 1);
850
851 let col1 = batches[0].column(0).as_primitive::<Int64Type>();
852 assert_eq!(col1.null_count(), 2);
853 assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
854 assert!(col1.is_null(4));
855 assert!(col1.is_null(5));
856
857 let col2 = batches[0].column(1).as_primitive::<Int32Type>();
858 assert_eq!(col2.null_count(), 0);
859 assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
860
861 let col3 = batches[0].column(2).as_boolean();
862 assert_eq!(col3.null_count(), 4);
863 assert!(col3.value(0));
864 assert!(!col3.is_null(0));
865 assert!(!col3.value(1));
866 assert!(!col3.is_null(1));
867
868 let col4 = batches[0].column(3).as_primitive::<Date32Type>();
869 assert_eq!(col4.null_count(), 3);
870 assert!(col4.is_null(3));
871 assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]);
872
873 let col5 = batches[0].column(4).as_primitive::<Date64Type>();
874 assert_eq!(col5.null_count(), 5);
875 assert!(col5.is_null(0));
876 assert!(col5.is_null(2));
877 assert!(col5.is_null(3));
878 assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]);
879 }
880
881 #[test]
882 fn test_string() {
883 let buf = r#"
884 {"a": "1", "b": "2"}
885 {"a": "hello", "b": "shoo"}
886 {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
887
888 {"b": null}
889 {"b": "", "a": null}
890
891 "#;
892 let schema = Arc::new(Schema::new(vec![
893 Field::new("a", DataType::Utf8, true),
894 Field::new("b", DataType::LargeUtf8, true),
895 ]));
896
897 let batches = do_read(buf, 1024, false, false, schema);
898 assert_eq!(batches.len(), 1);
899
900 let col1 = batches[0].column(0).as_string::<i32>();
901 assert_eq!(col1.null_count(), 2);
902 assert_eq!(col1.value(0), "1");
903 assert_eq!(col1.value(1), "hello");
904 assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
905 assert!(col1.is_null(3));
906 assert!(col1.is_null(4));
907
908 let col2 = batches[0].column(1).as_string::<i64>();
909 assert_eq!(col2.null_count(), 1);
910 assert_eq!(col2.value(0), "2");
911 assert_eq!(col2.value(1), "shoo");
912 assert_eq!(col2.value(2), "\t😁foo");
913 assert!(col2.is_null(3));
914 assert_eq!(col2.value(4), "");
915 }
916
917 #[test]
918 fn test_long_string_view_allocation() {
919 let expected_capacity: usize = 41;
929
930 let buf = r#"
931 {"a": "short", "b": "dummy"}
932 {"a": "this is definitely long", "b": "dummy"}
933 {"a": "hello", "b": "dummy"}
934 {"a": "\nfoobar😀asfgÿ", "b": "dummy"}
935 "#;
936
937 let schema = Arc::new(Schema::new(vec![
938 Field::new("a", DataType::Utf8View, true),
939 Field::new("b", DataType::LargeUtf8, true),
940 ]));
941
942 let batches = do_read(buf, 1024, false, false, schema);
943 assert_eq!(batches.len(), 1, "Expected one record batch");
944
945 let col_a = batches[0].column(0);
947 let string_view_array = col_a
948 .as_any()
949 .downcast_ref::<StringViewArray>()
950 .expect("Column should be a StringViewArray");
951
952 let data_buffer = string_view_array.to_data().buffers()[0].len();
955
956 assert!(
959 data_buffer >= expected_capacity,
960 "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
961 );
962
963 assert_eq!(string_view_array.value(0), "short");
965 assert_eq!(string_view_array.value(1), "this is definitely long");
966 assert_eq!(string_view_array.value(2), "hello");
967 assert_eq!(string_view_array.value(3), "\nfoobar😀asfgÿ");
968 }
969
970 #[test]
972 fn test_numeric_view_allocation() {
973 let expected_capacity: usize = 33;
981
982 let buf = r#"
983 {"n": 123456789}
984 {"n": 1000000000000}
985 {"n": 3.1415}
986 {"n": 2.718281828459045}
987 "#;
988
989 let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Utf8View, true)]));
990
991 let batches = do_read(buf, 1024, true, false, schema);
992 assert_eq!(batches.len(), 1, "Expected one record batch");
993
994 let col_n = batches[0].column(0);
995 let string_view_array = col_n
996 .as_any()
997 .downcast_ref::<StringViewArray>()
998 .expect("Column should be a StringViewArray");
999
1000 let data_buffer = string_view_array.to_data().buffers()[0].len();
1002 assert!(
1003 data_buffer >= expected_capacity,
1004 "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1005 );
1006
1007 assert_eq!(string_view_array.value(0), "123456789");
1010 assert_eq!(string_view_array.value(1), "1000000000000");
1011 assert_eq!(string_view_array.value(2), "3.1415");
1012 assert_eq!(string_view_array.value(3), "2.718281828459045");
1013 }
1014
1015 #[test]
1016 fn test_string_with_uft8view() {
1017 let buf = r#"
1018 {"a": "1", "b": "2"}
1019 {"a": "hello", "b": "shoo"}
1020 {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
1021
1022 {"b": null}
1023 {"b": "", "a": null}
1024
1025 "#;
1026 let schema = Arc::new(Schema::new(vec![
1027 Field::new("a", DataType::Utf8View, true),
1028 Field::new("b", DataType::LargeUtf8, true),
1029 ]));
1030
1031 let batches = do_read(buf, 1024, false, false, schema);
1032 assert_eq!(batches.len(), 1);
1033
1034 let col1 = batches[0].column(0).as_string_view();
1035 assert_eq!(col1.null_count(), 2);
1036 assert_eq!(col1.value(0), "1");
1037 assert_eq!(col1.value(1), "hello");
1038 assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1039 assert!(col1.is_null(3));
1040 assert!(col1.is_null(4));
1041 assert_eq!(col1.data_type(), &DataType::Utf8View);
1042
1043 let col2 = batches[0].column(1).as_string::<i64>();
1044 assert_eq!(col2.null_count(), 1);
1045 assert_eq!(col2.value(0), "2");
1046 assert_eq!(col2.value(1), "shoo");
1047 assert_eq!(col2.value(2), "\t😁foo");
1048 assert!(col2.is_null(3));
1049 assert_eq!(col2.value(4), "");
1050 }
1051
1052 #[test]
1053 fn test_complex() {
1054 let buf = r#"
1055 {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
1056 {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1057 {"list": null, "nested": {"a": null}}
1058 "#;
1059
1060 let schema = Arc::new(Schema::new(vec![
1061 Field::new_list("list", Field::new("element", DataType::Int32, false), true),
1062 Field::new_struct(
1063 "nested",
1064 vec![
1065 Field::new("a", DataType::Int32, true),
1066 Field::new("b", DataType::Int32, true),
1067 ],
1068 true,
1069 ),
1070 Field::new_struct(
1071 "nested_list",
1072 vec![Field::new_list(
1073 "list2",
1074 Field::new_struct(
1075 "element",
1076 vec![Field::new("c", DataType::Int32, false)],
1077 false,
1078 ),
1079 true,
1080 )],
1081 true,
1082 ),
1083 ]));
1084
1085 let batches = do_read(buf, 1024, false, false, schema);
1086 assert_eq!(batches.len(), 1);
1087
1088 let list = batches[0].column(0).as_list::<i32>();
1089 assert_eq!(list.len(), 3);
1090 assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
1091 assert_eq!(list.null_count(), 1);
1092 assert!(list.is_null(2));
1093 let list_values = list.values().as_primitive::<Int32Type>();
1094 assert_eq!(list_values.values(), &[5, 6]);
1095
1096 let nested = batches[0].column(1).as_struct();
1097 let a = nested.column(0).as_primitive::<Int32Type>();
1098 assert_eq!(list.null_count(), 1);
1099 assert_eq!(a.values(), &[1, 7, 0]);
1100 assert!(list.is_null(2));
1101
1102 let b = nested.column(1).as_primitive::<Int32Type>();
1103 assert_eq!(b.null_count(), 2);
1104 assert_eq!(b.len(), 3);
1105 assert_eq!(b.value(0), 2);
1106 assert!(b.is_null(1));
1107 assert!(b.is_null(2));
1108
1109 let nested_list = batches[0].column(2).as_struct();
1110 assert_eq!(nested_list.len(), 3);
1111 assert_eq!(nested_list.null_count(), 1);
1112 assert!(nested_list.is_null(2));
1113
1114 let list2 = nested_list.column(0).as_list::<i32>();
1115 assert_eq!(list2.len(), 3);
1116 assert_eq!(list2.null_count(), 1);
1117 assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
1118 assert!(list2.is_null(2));
1119
1120 let list2_values = list2.values().as_struct();
1121
1122 let c = list2_values.column(0).as_primitive::<Int32Type>();
1123 assert_eq!(c.values(), &[3, 4]);
1124 }
1125
1126 #[test]
1127 fn test_projection() {
1128 let buf = r#"
1129 {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
1130 {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1131 "#;
1132
1133 let schema = Arc::new(Schema::new(vec![
1134 Field::new_struct(
1135 "nested",
1136 vec![Field::new("a", DataType::Int32, false)],
1137 true,
1138 ),
1139 Field::new_struct(
1140 "nested_list",
1141 vec![Field::new_list(
1142 "list2",
1143 Field::new_struct(
1144 "element",
1145 vec![Field::new("d", DataType::Int32, true)],
1146 false,
1147 ),
1148 true,
1149 )],
1150 true,
1151 ),
1152 ]));
1153
1154 let batches = do_read(buf, 1024, false, false, schema);
1155 assert_eq!(batches.len(), 1);
1156
1157 let nested = batches[0].column(0).as_struct();
1158 assert_eq!(nested.num_columns(), 1);
1159 let a = nested.column(0).as_primitive::<Int32Type>();
1160 assert_eq!(a.null_count(), 0);
1161 assert_eq!(a.values(), &[1, 7]);
1162
1163 let nested_list = batches[0].column(1).as_struct();
1164 assert_eq!(nested_list.num_columns(), 1);
1165 assert_eq!(nested_list.null_count(), 0);
1166
1167 let list2 = nested_list.column(0).as_list::<i32>();
1168 assert_eq!(list2.value_offsets(), &[0, 2, 2]);
1169 assert_eq!(list2.null_count(), 0);
1170
1171 let child = list2.values().as_struct();
1172 assert_eq!(child.num_columns(), 1);
1173 assert_eq!(child.len(), 2);
1174 assert_eq!(child.null_count(), 0);
1175
1176 let c = child.column(0).as_primitive::<Int32Type>();
1177 assert_eq!(c.values(), &[5, 0]);
1178 assert_eq!(c.null_count(), 1);
1179 assert!(c.is_null(1));
1180 }
1181
1182 #[test]
1183 fn test_map() {
1184 let buf = r#"
1185 {"map": {"a": ["foo", null]}}
1186 {"map": {"a": [null], "b": []}}
1187 {"map": {"c": null, "a": ["baz"]}}
1188 "#;
1189 let map = Field::new_map(
1190 "map",
1191 "entries",
1192 Field::new("key", DataType::Utf8, false),
1193 Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
1194 false,
1195 true,
1196 );
1197
1198 let schema = Arc::new(Schema::new(vec![map]));
1199
1200 let batches = do_read(buf, 1024, false, false, schema);
1201 assert_eq!(batches.len(), 1);
1202
1203 let map = batches[0].column(0).as_map();
1204 let map_keys = map.keys().as_string::<i32>();
1205 let map_values = map.values().as_list::<i32>();
1206 assert_eq!(map.value_offsets(), &[0, 1, 3, 5]);
1207
1208 let k: Vec<_> = map_keys.iter().flatten().collect();
1209 assert_eq!(&k, &["a", "a", "b", "c", "a"]);
1210
1211 let list_values = map_values.values().as_string::<i32>();
1212 let lv: Vec<_> = list_values.iter().collect();
1213 assert_eq!(&lv, &[Some("foo"), None, None, Some("baz")]);
1214 assert_eq!(map_values.value_offsets(), &[0, 2, 3, 3, 3, 4]);
1215 assert_eq!(map_values.null_count(), 1);
1216 assert!(map_values.is_null(3));
1217
1218 let options = FormatOptions::default().with_null("null");
1219 let formatter = ArrayFormatter::try_new(map, &options).unwrap();
1220 assert_eq!(formatter.value(0).to_string(), "{a: [foo, null]}");
1221 assert_eq!(formatter.value(1).to_string(), "{a: [null], b: []}");
1222 assert_eq!(formatter.value(2).to_string(), "{c: null, a: [baz]}");
1223 }
1224
1225 #[test]
1226 fn test_not_coercing_primitive_into_string_without_flag() {
1227 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
1228
1229 let buf = r#"{"a": 1}"#;
1230 let err = ReaderBuilder::new(schema.clone())
1231 .with_batch_size(1024)
1232 .build(Cursor::new(buf.as_bytes()))
1233 .unwrap()
1234 .read()
1235 .unwrap_err();
1236
1237 assert_eq!(
1238 err.to_string(),
1239 "Json error: whilst decoding field 'a': expected string got 1"
1240 );
1241
1242 let buf = r#"{"a": true}"#;
1243 let err = ReaderBuilder::new(schema)
1244 .with_batch_size(1024)
1245 .build(Cursor::new(buf.as_bytes()))
1246 .unwrap()
1247 .read()
1248 .unwrap_err();
1249
1250 assert_eq!(
1251 err.to_string(),
1252 "Json error: whilst decoding field 'a': expected string got true"
1253 );
1254 }
1255
1256 #[test]
1257 fn test_coercing_primitive_into_string() {
1258 let buf = r#"
1259 {"a": 1, "b": 2, "c": true}
1260 {"a": 2E0, "b": 4, "c": false}
1261
1262 {"b": 6, "a": 2.0}
1263 {"b": "5", "a": 2}
1264 {"b": 4e0}
1265 {"b": 7, "a": null}
1266 "#;
1267
1268 let schema = Arc::new(Schema::new(vec![
1269 Field::new("a", DataType::Utf8, true),
1270 Field::new("b", DataType::Utf8, true),
1271 Field::new("c", DataType::Utf8, true),
1272 ]));
1273
1274 let batches = do_read(buf, 1024, true, false, schema);
1275 assert_eq!(batches.len(), 1);
1276
1277 let col1 = batches[0].column(0).as_string::<i32>();
1278 assert_eq!(col1.null_count(), 2);
1279 assert_eq!(col1.value(0), "1");
1280 assert_eq!(col1.value(1), "2E0");
1281 assert_eq!(col1.value(2), "2.0");
1282 assert_eq!(col1.value(3), "2");
1283 assert!(col1.is_null(4));
1284 assert!(col1.is_null(5));
1285
1286 let col2 = batches[0].column(1).as_string::<i32>();
1287 assert_eq!(col2.null_count(), 0);
1288 assert_eq!(col2.value(0), "2");
1289 assert_eq!(col2.value(1), "4");
1290 assert_eq!(col2.value(2), "6");
1291 assert_eq!(col2.value(3), "5");
1292 assert_eq!(col2.value(4), "4e0");
1293 assert_eq!(col2.value(5), "7");
1294
1295 let col3 = batches[0].column(2).as_string::<i32>();
1296 assert_eq!(col3.null_count(), 4);
1297 assert_eq!(col3.value(0), "true");
1298 assert_eq!(col3.value(1), "false");
1299 assert!(col3.is_null(2));
1300 assert!(col3.is_null(3));
1301 assert!(col3.is_null(4));
1302 assert!(col3.is_null(5));
1303 }
1304
1305 fn test_decimal<T: DecimalType>(data_type: DataType) {
1306 let buf = r#"
1307 {"a": 1, "b": 2, "c": 38.30}
1308 {"a": 2, "b": 4, "c": 123.456}
1309
1310 {"b": 1337, "a": "2.0452"}
1311 {"b": "5", "a": "11034.2"}
1312 {"b": 40}
1313 {"b": 1234, "a": null}
1314 "#;
1315
1316 let schema = Arc::new(Schema::new(vec![
1317 Field::new("a", data_type.clone(), true),
1318 Field::new("b", data_type.clone(), true),
1319 Field::new("c", data_type, true),
1320 ]));
1321
1322 let batches = do_read(buf, 1024, true, false, schema);
1323 assert_eq!(batches.len(), 1);
1324
1325 let col1 = batches[0].column(0).as_primitive::<T>();
1326 assert_eq!(col1.null_count(), 2);
1327 assert!(col1.is_null(4));
1328 assert!(col1.is_null(5));
1329 assert_eq!(
1330 col1.values(),
1331 &[100, 200, 204, 1103420, 0, 0].map(T::Native::usize_as)
1332 );
1333
1334 let col2 = batches[0].column(1).as_primitive::<T>();
1335 assert_eq!(col2.null_count(), 0);
1336 assert_eq!(
1337 col2.values(),
1338 &[200, 400, 133700, 500, 4000, 123400].map(T::Native::usize_as)
1339 );
1340
1341 let col3 = batches[0].column(2).as_primitive::<T>();
1342 assert_eq!(col3.null_count(), 4);
1343 assert!(!col3.is_null(0));
1344 assert!(!col3.is_null(1));
1345 assert!(col3.is_null(2));
1346 assert!(col3.is_null(3));
1347 assert!(col3.is_null(4));
1348 assert!(col3.is_null(5));
1349 assert_eq!(
1350 col3.values(),
1351 &[3830, 12345, 0, 0, 0, 0].map(T::Native::usize_as)
1352 );
1353 }
1354
1355 #[test]
1356 fn test_decimals() {
1357 test_decimal::<Decimal32Type>(DataType::Decimal32(8, 2));
1358 test_decimal::<Decimal64Type>(DataType::Decimal64(10, 2));
1359 test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
1360 test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
1361 }
1362
1363 fn test_timestamp<T: ArrowTimestampType>() {
1364 let buf = r#"
1365 {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"}
1366 {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456}
1367
1368 {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"}
1369 {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"}
1370 {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"}
1371 {"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"}
1372 "#;
1373
1374 let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".into()));
1375 let schema = Arc::new(Schema::new(vec![
1376 Field::new("a", T::DATA_TYPE, true),
1377 Field::new("b", T::DATA_TYPE, true),
1378 Field::new("c", T::DATA_TYPE, true),
1379 Field::new("d", with_timezone, true),
1380 ]));
1381
1382 let batches = do_read(buf, 1024, true, false, schema);
1383 assert_eq!(batches.len(), 1);
1384
1385 let unit_in_nanos: i64 = match T::UNIT {
1386 TimeUnit::Second => 1_000_000_000,
1387 TimeUnit::Millisecond => 1_000_000,
1388 TimeUnit::Microsecond => 1_000,
1389 TimeUnit::Nanosecond => 1,
1390 };
1391
1392 let col1 = batches[0].column(0).as_primitive::<T>();
1393 assert_eq!(col1.null_count(), 4);
1394 assert!(col1.is_null(2));
1395 assert!(col1.is_null(3));
1396 assert!(col1.is_null(4));
1397 assert!(col1.is_null(5));
1398 assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1399
1400 let col2 = batches[0].column(1).as_primitive::<T>();
1401 assert_eq!(col2.null_count(), 1);
1402 assert!(col2.is_null(5));
1403 assert_eq!(
1404 col2.values(),
1405 &[
1406 1599572549190855000 / unit_in_nanos,
1407 1599572549190855000 / unit_in_nanos,
1408 1599572549000000000 / unit_in_nanos,
1409 40,
1410 1234,
1411 0
1412 ]
1413 );
1414
1415 let col3 = batches[0].column(2).as_primitive::<T>();
1416 assert_eq!(col3.null_count(), 0);
1417 assert_eq!(
1418 col3.values(),
1419 &[
1420 38,
1421 123,
1422 854702816123000000 / unit_in_nanos,
1423 1599572549190855000 / unit_in_nanos,
1424 854702816123000000 / unit_in_nanos,
1425 854738816123000000 / unit_in_nanos
1426 ]
1427 );
1428
1429 let col4 = batches[0].column(3).as_primitive::<T>();
1430
1431 assert_eq!(col4.null_count(), 0);
1432 assert_eq!(
1433 col4.values(),
1434 &[
1435 854674016123000000 / unit_in_nanos,
1436 123,
1437 854702816123000000 / unit_in_nanos,
1438 854720816123000000 / unit_in_nanos,
1439 854674016000000000 / unit_in_nanos,
1440 854640000000000000 / unit_in_nanos
1441 ]
1442 );
1443 }
1444
1445 #[test]
1446 fn test_timestamps() {
1447 test_timestamp::<TimestampSecondType>();
1448 test_timestamp::<TimestampMillisecondType>();
1449 test_timestamp::<TimestampMicrosecondType>();
1450 test_timestamp::<TimestampNanosecondType>();
1451 }
1452
1453 fn test_time<T: ArrowTemporalType>() {
1454 let buf = r#"
1455 {"a": 1, "b": "09:26:56.123 AM", "c": 38.30}
1456 {"a": 2, "b": "23:59:59", "c": 123.456}
1457
1458 {"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"}
1459 {"b": 40, "c": "13:42:29.190855"}
1460 {"b": 1234, "a": null, "c": "09:26:56.123"}
1461 {"c": "14:26:56.123"}
1462 "#;
1463
1464 let unit = match T::DATA_TYPE {
1465 DataType::Time32(unit) | DataType::Time64(unit) => unit,
1466 _ => unreachable!(),
1467 };
1468
1469 let unit_in_nanos = match unit {
1470 TimeUnit::Second => 1_000_000_000,
1471 TimeUnit::Millisecond => 1_000_000,
1472 TimeUnit::Microsecond => 1_000,
1473 TimeUnit::Nanosecond => 1,
1474 };
1475
1476 let schema = Arc::new(Schema::new(vec![
1477 Field::new("a", T::DATA_TYPE, true),
1478 Field::new("b", T::DATA_TYPE, true),
1479 Field::new("c", T::DATA_TYPE, true),
1480 ]));
1481
1482 let batches = do_read(buf, 1024, true, false, schema);
1483 assert_eq!(batches.len(), 1);
1484
1485 let col1 = batches[0].column(0).as_primitive::<T>();
1486 assert_eq!(col1.null_count(), 4);
1487 assert!(col1.is_null(2));
1488 assert!(col1.is_null(3));
1489 assert!(col1.is_null(4));
1490 assert!(col1.is_null(5));
1491 assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1492
1493 let col2 = batches[0].column(1).as_primitive::<T>();
1494 assert_eq!(col2.null_count(), 1);
1495 assert!(col2.is_null(5));
1496 assert_eq!(
1497 col2.values(),
1498 &[
1499 34016123000000 / unit_in_nanos,
1500 86399000000000 / unit_in_nanos,
1501 64800000000000 / unit_in_nanos,
1502 40,
1503 1234,
1504 0
1505 ]
1506 .map(T::Native::usize_as)
1507 );
1508
1509 let col3 = batches[0].column(2).as_primitive::<T>();
1510 assert_eq!(col3.null_count(), 0);
1511 assert_eq!(
1512 col3.values(),
1513 &[
1514 38,
1515 123,
1516 34016123000000 / unit_in_nanos,
1517 49349190855000 / unit_in_nanos,
1518 34016123000000 / unit_in_nanos,
1519 52016123000000 / unit_in_nanos
1520 ]
1521 .map(T::Native::usize_as)
1522 );
1523 }
1524
1525 #[test]
1526 fn test_times() {
1527 test_time::<Time32MillisecondType>();
1528 test_time::<Time32SecondType>();
1529 test_time::<Time64MicrosecondType>();
1530 test_time::<Time64NanosecondType>();
1531 }
1532
1533 fn test_duration<T: ArrowTemporalType>() {
1534 let buf = r#"
1535 {"a": 1, "b": "2"}
1536 {"a": 3, "b": null}
1537 "#;
1538
1539 let schema = Arc::new(Schema::new(vec![
1540 Field::new("a", T::DATA_TYPE, true),
1541 Field::new("b", T::DATA_TYPE, true),
1542 ]));
1543
1544 let batches = do_read(buf, 1024, true, false, schema);
1545 assert_eq!(batches.len(), 1);
1546
1547 let col_a = batches[0].column_by_name("a").unwrap().as_primitive::<T>();
1548 assert_eq!(col_a.null_count(), 0);
1549 assert_eq!(col_a.values(), &[1, 3].map(T::Native::usize_as));
1550
1551 let col2 = batches[0].column_by_name("b").unwrap().as_primitive::<T>();
1552 assert_eq!(col2.null_count(), 1);
1553 assert_eq!(col2.values(), &[2, 0].map(T::Native::usize_as));
1554 }
1555
1556 #[test]
1557 fn test_durations() {
1558 test_duration::<DurationNanosecondType>();
1559 test_duration::<DurationMicrosecondType>();
1560 test_duration::<DurationMillisecondType>();
1561 test_duration::<DurationSecondType>();
1562 }
1563
1564 #[test]
1565 fn test_delta_checkpoint() {
1566 let json = "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}";
1567 let schema = Arc::new(Schema::new(vec![
1568 Field::new_struct(
1569 "protocol",
1570 vec![
1571 Field::new("minReaderVersion", DataType::Int32, true),
1572 Field::new("minWriterVersion", DataType::Int32, true),
1573 ],
1574 true,
1575 ),
1576 Field::new_struct(
1577 "add",
1578 vec![Field::new_map(
1579 "partitionValues",
1580 "key_value",
1581 Field::new("key", DataType::Utf8, false),
1582 Field::new("value", DataType::Utf8, true),
1583 false,
1584 false,
1585 )],
1586 true,
1587 ),
1588 ]));
1589
1590 let batches = do_read(json, 1024, true, false, schema);
1591 assert_eq!(batches.len(), 1);
1592
1593 let s: StructArray = batches.into_iter().next().unwrap().into();
1594 let opts = FormatOptions::default().with_null("null");
1595 let formatter = ArrayFormatter::try_new(&s, &opts).unwrap();
1596 assert_eq!(
1597 formatter.value(0).to_string(),
1598 "{protocol: {minReaderVersion: 1, minWriterVersion: 2}, add: null}"
1599 );
1600 }
1601
1602 #[test]
1603 fn struct_nullability() {
1604 let do_test = |child: DataType| {
1605 let non_null = r#"{"foo": {}}"#;
1607 let schema = Arc::new(Schema::new(vec![Field::new_struct(
1608 "foo",
1609 vec![Field::new("bar", child, false)],
1610 true,
1611 )]));
1612 let mut reader = ReaderBuilder::new(schema.clone())
1613 .build(Cursor::new(non_null.as_bytes()))
1614 .unwrap();
1615 assert!(reader.next().unwrap().is_err()); let null = r#"{"foo": {bar: null}}"#;
1618 let mut reader = ReaderBuilder::new(schema.clone())
1619 .build(Cursor::new(null.as_bytes()))
1620 .unwrap();
1621 assert!(reader.next().unwrap().is_err()); let null = r#"{"foo": null}"#;
1625 let mut reader = ReaderBuilder::new(schema)
1626 .build(Cursor::new(null.as_bytes()))
1627 .unwrap();
1628 let batch = reader.next().unwrap().unwrap();
1629 assert_eq!(batch.num_columns(), 1);
1630 let foo = batch.column(0).as_struct();
1631 assert_eq!(foo.len(), 1);
1632 assert!(foo.is_null(0));
1633 assert_eq!(foo.num_columns(), 1);
1634
1635 let bar = foo.column(0);
1636 assert_eq!(bar.len(), 1);
1637 assert!(bar.is_null(0));
1639 };
1640
1641 do_test(DataType::Boolean);
1642 do_test(DataType::Int32);
1643 do_test(DataType::Utf8);
1644 do_test(DataType::Decimal128(2, 1));
1645 do_test(DataType::Timestamp(
1646 TimeUnit::Microsecond,
1647 Some("+00:00".into()),
1648 ));
1649 }
1650
1651 #[test]
1652 fn test_truncation() {
1653 let buf = r#"
1654 {"i64": 9223372036854775807, "u64": 18446744073709551615 }
1655 {"i64": "9223372036854775807", "u64": "18446744073709551615" }
1656 {"i64": -9223372036854775808, "u64": 0 }
1657 {"i64": "-9223372036854775808", "u64": 0 }
1658 "#;
1659
1660 let schema = Arc::new(Schema::new(vec![
1661 Field::new("i64", DataType::Int64, true),
1662 Field::new("u64", DataType::UInt64, true),
1663 ]));
1664
1665 let batches = do_read(buf, 1024, true, false, schema);
1666 assert_eq!(batches.len(), 1);
1667
1668 let i64 = batches[0].column(0).as_primitive::<Int64Type>();
1669 assert_eq!(i64.values(), &[i64::MAX, i64::MAX, i64::MIN, i64::MIN]);
1670
1671 let u64 = batches[0].column(1).as_primitive::<UInt64Type>();
1672 assert_eq!(u64.values(), &[u64::MAX, u64::MAX, u64::MIN, u64::MIN]);
1673 }
1674
1675 #[test]
1676 fn test_timestamp_truncation() {
1677 let buf = r#"
1678 {"time": 9223372036854775807 }
1679 {"time": -9223372036854775808 }
1680 {"time": 9e5 }
1681 "#;
1682
1683 let schema = Arc::new(Schema::new(vec![Field::new(
1684 "time",
1685 DataType::Timestamp(TimeUnit::Nanosecond, None),
1686 true,
1687 )]));
1688
1689 let batches = do_read(buf, 1024, true, false, schema);
1690 assert_eq!(batches.len(), 1);
1691
1692 let i64 = batches[0]
1693 .column(0)
1694 .as_primitive::<TimestampNanosecondType>();
1695 assert_eq!(i64.values(), &[i64::MAX, i64::MIN, 900000]);
1696 }
1697
1698 #[test]
1699 fn test_strict_mode_no_missing_columns_in_schema() {
1700 let buf = r#"
1701 {"a": 1, "b": "2", "c": true}
1702 {"a": 2E0, "b": "4", "c": false}
1703 "#;
1704
1705 let schema = Arc::new(Schema::new(vec![
1706 Field::new("a", DataType::Int16, false),
1707 Field::new("b", DataType::Utf8, false),
1708 Field::new("c", DataType::Boolean, false),
1709 ]));
1710
1711 let batches = do_read(buf, 1024, true, true, schema);
1712 assert_eq!(batches.len(), 1);
1713
1714 let buf = r#"
1715 {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1716 {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1717 "#;
1718
1719 let schema = Arc::new(Schema::new(vec![
1720 Field::new("a", DataType::Int16, false),
1721 Field::new("b", DataType::Utf8, false),
1722 Field::new_struct(
1723 "c",
1724 vec![
1725 Field::new("a", DataType::Boolean, false),
1726 Field::new("b", DataType::Int16, false),
1727 ],
1728 false,
1729 ),
1730 ]));
1731
1732 let batches = do_read(buf, 1024, true, true, schema);
1733 assert_eq!(batches.len(), 1);
1734 }
1735
1736 #[test]
1737 fn test_strict_mode_missing_columns_in_schema() {
1738 let buf = r#"
1739 {"a": 1, "b": "2", "c": true}
1740 {"a": 2E0, "b": "4", "c": false}
1741 "#;
1742
1743 let schema = Arc::new(Schema::new(vec![
1744 Field::new("a", DataType::Int16, true),
1745 Field::new("c", DataType::Boolean, true),
1746 ]));
1747
1748 let err = ReaderBuilder::new(schema)
1749 .with_batch_size(1024)
1750 .with_strict_mode(true)
1751 .build(Cursor::new(buf.as_bytes()))
1752 .unwrap()
1753 .read()
1754 .unwrap_err();
1755
1756 assert_eq!(
1757 err.to_string(),
1758 "Json error: column 'b' missing from schema"
1759 );
1760
1761 let buf = r#"
1762 {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1763 {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1764 "#;
1765
1766 let schema = Arc::new(Schema::new(vec![
1767 Field::new("a", DataType::Int16, false),
1768 Field::new("b", DataType::Utf8, false),
1769 Field::new_struct("c", vec![Field::new("a", DataType::Boolean, false)], false),
1770 ]));
1771
1772 let err = ReaderBuilder::new(schema)
1773 .with_batch_size(1024)
1774 .with_strict_mode(true)
1775 .build(Cursor::new(buf.as_bytes()))
1776 .unwrap()
1777 .read()
1778 .unwrap_err();
1779
1780 assert_eq!(
1781 err.to_string(),
1782 "Json error: whilst decoding field 'c': column 'b' missing from schema"
1783 );
1784 }
1785
1786 fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
1787 let file = File::open(path).unwrap();
1788 let mut reader = BufReader::new(file);
1789 let schema = schema.unwrap_or_else(|| {
1790 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1791 reader.rewind().unwrap();
1792 schema
1793 });
1794 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1795 builder.build(reader).unwrap()
1796 }
1797
1798 #[test]
1799 fn test_json_basic() {
1800 let mut reader = read_file("test/data/basic.json", None);
1801 let batch = reader.next().unwrap().unwrap();
1802
1803 assert_eq!(8, batch.num_columns());
1804 assert_eq!(12, batch.num_rows());
1805
1806 let schema = reader.schema();
1807 let batch_schema = batch.schema();
1808 assert_eq!(schema, batch_schema);
1809
1810 let a = schema.column_with_name("a").unwrap();
1811 assert_eq!(0, a.0);
1812 assert_eq!(&DataType::Int64, a.1.data_type());
1813 let b = schema.column_with_name("b").unwrap();
1814 assert_eq!(1, b.0);
1815 assert_eq!(&DataType::Float64, b.1.data_type());
1816 let c = schema.column_with_name("c").unwrap();
1817 assert_eq!(2, c.0);
1818 assert_eq!(&DataType::Boolean, c.1.data_type());
1819 let d = schema.column_with_name("d").unwrap();
1820 assert_eq!(3, d.0);
1821 assert_eq!(&DataType::Utf8, d.1.data_type());
1822
1823 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1824 assert_eq!(1, aa.value(0));
1825 assert_eq!(-10, aa.value(1));
1826 let bb = batch.column(b.0).as_primitive::<Float64Type>();
1827 assert_eq!(2.0, bb.value(0));
1828 assert_eq!(-3.5, bb.value(1));
1829 let cc = batch.column(c.0).as_boolean();
1830 assert!(!cc.value(0));
1831 assert!(cc.value(10));
1832 let dd = batch.column(d.0).as_string::<i32>();
1833 assert_eq!("4", dd.value(0));
1834 assert_eq!("text", dd.value(8));
1835 }
1836
1837 #[test]
1838 fn test_json_empty_projection() {
1839 let mut reader = read_file("test/data/basic.json", Some(Schema::empty()));
1840 let batch = reader.next().unwrap().unwrap();
1841
1842 assert_eq!(0, batch.num_columns());
1843 assert_eq!(12, batch.num_rows());
1844 }
1845
1846 #[test]
1847 fn test_json_basic_with_nulls() {
1848 let mut reader = read_file("test/data/basic_nulls.json", None);
1849 let batch = reader.next().unwrap().unwrap();
1850
1851 assert_eq!(4, batch.num_columns());
1852 assert_eq!(12, batch.num_rows());
1853
1854 let schema = reader.schema();
1855 let batch_schema = batch.schema();
1856 assert_eq!(schema, batch_schema);
1857
1858 let a = schema.column_with_name("a").unwrap();
1859 assert_eq!(&DataType::Int64, a.1.data_type());
1860 let b = schema.column_with_name("b").unwrap();
1861 assert_eq!(&DataType::Float64, b.1.data_type());
1862 let c = schema.column_with_name("c").unwrap();
1863 assert_eq!(&DataType::Boolean, c.1.data_type());
1864 let d = schema.column_with_name("d").unwrap();
1865 assert_eq!(&DataType::Utf8, d.1.data_type());
1866
1867 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1868 assert!(aa.is_valid(0));
1869 assert!(!aa.is_valid(1));
1870 assert!(!aa.is_valid(11));
1871 let bb = batch.column(b.0).as_primitive::<Float64Type>();
1872 assert!(bb.is_valid(0));
1873 assert!(!bb.is_valid(2));
1874 assert!(!bb.is_valid(11));
1875 let cc = batch.column(c.0).as_boolean();
1876 assert!(cc.is_valid(0));
1877 assert!(!cc.is_valid(4));
1878 assert!(!cc.is_valid(11));
1879 let dd = batch.column(d.0).as_string::<i32>();
1880 assert!(!dd.is_valid(0));
1881 assert!(dd.is_valid(1));
1882 assert!(!dd.is_valid(4));
1883 assert!(!dd.is_valid(11));
1884 }
1885
1886 #[test]
1887 fn test_json_basic_schema() {
1888 let schema = Schema::new(vec![
1889 Field::new("a", DataType::Int64, true),
1890 Field::new("b", DataType::Float32, false),
1891 Field::new("c", DataType::Boolean, false),
1892 Field::new("d", DataType::Utf8, false),
1893 ]);
1894
1895 let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1896 let reader_schema = reader.schema();
1897 assert_eq!(reader_schema.as_ref(), &schema);
1898 let batch = reader.next().unwrap().unwrap();
1899
1900 assert_eq!(4, batch.num_columns());
1901 assert_eq!(12, batch.num_rows());
1902
1903 let schema = batch.schema();
1904
1905 let a = schema.column_with_name("a").unwrap();
1906 assert_eq!(&DataType::Int64, a.1.data_type());
1907 let b = schema.column_with_name("b").unwrap();
1908 assert_eq!(&DataType::Float32, b.1.data_type());
1909 let c = schema.column_with_name("c").unwrap();
1910 assert_eq!(&DataType::Boolean, c.1.data_type());
1911 let d = schema.column_with_name("d").unwrap();
1912 assert_eq!(&DataType::Utf8, d.1.data_type());
1913
1914 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1915 assert_eq!(1, aa.value(0));
1916 assert_eq!(100000000000000, aa.value(11));
1917 let bb = batch.column(b.0).as_primitive::<Float32Type>();
1918 assert_eq!(2.0, bb.value(0));
1919 assert_eq!(-3.5, bb.value(1));
1920 }
1921
1922 #[test]
1923 fn test_json_basic_schema_projection() {
1924 let schema = Schema::new(vec![
1925 Field::new("a", DataType::Int64, true),
1926 Field::new("c", DataType::Boolean, false),
1927 ]);
1928
1929 let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1930 let batch = reader.next().unwrap().unwrap();
1931
1932 assert_eq!(2, batch.num_columns());
1933 assert_eq!(2, batch.schema().fields().len());
1934 assert_eq!(12, batch.num_rows());
1935
1936 assert_eq!(batch.schema().as_ref(), &schema);
1937
1938 let a = schema.column_with_name("a").unwrap();
1939 assert_eq!(0, a.0);
1940 assert_eq!(&DataType::Int64, a.1.data_type());
1941 let c = schema.column_with_name("c").unwrap();
1942 assert_eq!(1, c.0);
1943 assert_eq!(&DataType::Boolean, c.1.data_type());
1944 }
1945
1946 #[test]
1947 fn test_json_arrays() {
1948 let mut reader = read_file("test/data/arrays.json", None);
1949 let batch = reader.next().unwrap().unwrap();
1950
1951 assert_eq!(4, batch.num_columns());
1952 assert_eq!(3, batch.num_rows());
1953
1954 let schema = batch.schema();
1955
1956 let a = schema.column_with_name("a").unwrap();
1957 assert_eq!(&DataType::Int64, a.1.data_type());
1958 let b = schema.column_with_name("b").unwrap();
1959 assert_eq!(
1960 &DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))),
1961 b.1.data_type()
1962 );
1963 let c = schema.column_with_name("c").unwrap();
1964 assert_eq!(
1965 &DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
1966 c.1.data_type()
1967 );
1968 let d = schema.column_with_name("d").unwrap();
1969 assert_eq!(&DataType::Utf8, d.1.data_type());
1970
1971 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1972 assert_eq!(1, aa.value(0));
1973 assert_eq!(-10, aa.value(1));
1974 assert_eq!(1627668684594000000, aa.value(2));
1975 let bb = batch.column(b.0).as_list::<i32>();
1976 let bb = bb.values().as_primitive::<Float64Type>();
1977 assert_eq!(9, bb.len());
1978 assert_eq!(2.0, bb.value(0));
1979 assert_eq!(-6.1, bb.value(5));
1980 assert!(!bb.is_valid(7));
1981
1982 let cc = batch
1983 .column(c.0)
1984 .as_any()
1985 .downcast_ref::<ListArray>()
1986 .unwrap();
1987 let cc = cc.values().as_boolean();
1988 assert_eq!(6, cc.len());
1989 assert!(!cc.value(0));
1990 assert!(!cc.value(4));
1991 assert!(!cc.is_valid(5));
1992 }
1993
1994 #[test]
1995 fn test_empty_json_arrays() {
1996 let json_content = r#"
1997 {"items": []}
1998 {"items": null}
1999 {}
2000 "#;
2001
2002 let schema = Arc::new(Schema::new(vec![Field::new(
2003 "items",
2004 DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2005 true,
2006 )]));
2007
2008 let batches = do_read(json_content, 1024, false, false, schema);
2009 assert_eq!(batches.len(), 1);
2010
2011 let col1 = batches[0].column(0).as_list::<i32>();
2012 assert_eq!(col1.null_count(), 2);
2013 assert!(col1.value(0).is_empty());
2014 assert_eq!(col1.value(0).data_type(), &DataType::Null);
2015 assert!(col1.is_null(1));
2016 assert!(col1.is_null(2));
2017 }
2018
2019 #[test]
2020 fn test_nested_empty_json_arrays() {
2021 let json_content = r#"
2022 {"items": [[],[]]}
2023 {"items": [[null, null],[null]]}
2024 "#;
2025
2026 let schema = Arc::new(Schema::new(vec![Field::new(
2027 "items",
2028 DataType::List(FieldRef::new(Field::new_list_field(
2029 DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2030 true,
2031 ))),
2032 true,
2033 )]));
2034
2035 let batches = do_read(json_content, 1024, false, false, schema);
2036 assert_eq!(batches.len(), 1);
2037
2038 let col1 = batches[0].column(0).as_list::<i32>();
2039 assert_eq!(col1.null_count(), 0);
2040 assert_eq!(col1.value(0).len(), 2);
2041 assert!(col1.value(0).as_list::<i32>().value(0).is_empty());
2042 assert!(col1.value(0).as_list::<i32>().value(1).is_empty());
2043
2044 assert_eq!(col1.value(1).len(), 2);
2045 assert_eq!(col1.value(1).as_list::<i32>().value(0).len(), 2);
2046 assert_eq!(col1.value(1).as_list::<i32>().value(1).len(), 1);
2047 }
2048
2049 #[test]
2050 fn test_nested_list_json_arrays() {
2051 let c_field = Field::new_struct("c", vec![Field::new("d", DataType::Utf8, true)], true);
2052 let a_struct_field = Field::new_struct(
2053 "a",
2054 vec![Field::new("b", DataType::Boolean, true), c_field.clone()],
2055 true,
2056 );
2057 let a_field = Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
2058 let schema = Arc::new(Schema::new(vec![a_field.clone()]));
2059 let builder = ReaderBuilder::new(schema).with_batch_size(64);
2060 let json_content = r#"
2061 {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
2062 {"a": [{"b": false, "c": null}]}
2063 {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
2064 {"a": null}
2065 {"a": []}
2066 {"a": [null]}
2067 "#;
2068 let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2069
2070 let d = StringArray::from(vec![
2072 Some("a_text"),
2073 Some("b_text"),
2074 None,
2075 Some("c_text"),
2076 Some("d_text"),
2077 None,
2078 None,
2079 ]);
2080 let c = ArrayDataBuilder::new(c_field.data_type().clone())
2081 .len(7)
2082 .add_child_data(d.to_data())
2083 .null_bit_buffer(Some(Buffer::from([0b00111011])))
2084 .build()
2085 .unwrap();
2086 let b = BooleanArray::from(vec![
2087 Some(true),
2088 Some(false),
2089 Some(false),
2090 Some(true),
2091 None,
2092 Some(true),
2093 None,
2094 ]);
2095 let a = ArrayDataBuilder::new(a_struct_field.data_type().clone())
2096 .len(7)
2097 .add_child_data(b.to_data())
2098 .add_child_data(c.clone())
2099 .null_bit_buffer(Some(Buffer::from([0b00111111])))
2100 .build()
2101 .unwrap();
2102 let a_list = ArrayDataBuilder::new(a_field.data_type().clone())
2103 .len(6)
2104 .add_buffer(Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7]))
2105 .add_child_data(a)
2106 .null_bit_buffer(Some(Buffer::from([0b00110111])))
2107 .build()
2108 .unwrap();
2109 let expected = make_array(a_list);
2110
2111 let batch = reader.next().unwrap().unwrap();
2113 let read = batch.column(0);
2114 assert_eq!(read.len(), 6);
2115 let read: &ListArray = read.as_list::<i32>();
2117 let expected = expected.as_list::<i32>();
2118 assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
2119 assert_eq!(read.nulls(), expected.nulls());
2121 let struct_array = read.values().as_struct();
2123 let expected_struct_array = expected.values().as_struct();
2124
2125 assert_eq!(7, struct_array.len());
2126 assert_eq!(1, struct_array.null_count());
2127 assert_eq!(7, expected_struct_array.len());
2128 assert_eq!(1, expected_struct_array.null_count());
2129 assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
2131 let read_b = struct_array.column(0);
2133 assert_eq!(read_b.as_ref(), &b);
2134 let read_c = struct_array.column(1);
2135 assert_eq!(read_c.to_data(), c);
2136 let read_c = read_c.as_struct();
2137 let read_d = read_c.column(0);
2138 assert_eq!(read_d.as_ref(), &d);
2139
2140 assert_eq!(read, expected);
2141 }
2142
2143 #[test]
2144 fn test_skip_empty_lines() {
2145 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2146 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
2147 let json_content = "
2148 {\"a\": 1}
2149 {\"a\": 2}
2150 {\"a\": 3}";
2151 let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2152 let batch = reader.next().unwrap().unwrap();
2153
2154 assert_eq!(1, batch.num_columns());
2155 assert_eq!(3, batch.num_rows());
2156
2157 let schema = reader.schema();
2158 let c = schema.column_with_name("a").unwrap();
2159 assert_eq!(&DataType::Int64, c.1.data_type());
2160 }
2161
2162 #[test]
2163 fn test_with_multiple_batches() {
2164 let file = File::open("test/data/basic_nulls.json").unwrap();
2165 let mut reader = BufReader::new(file);
2166 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2167 reader.rewind().unwrap();
2168
2169 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2170 let mut reader = builder.build(reader).unwrap();
2171
2172 let mut num_records = Vec::new();
2173 while let Some(rb) = reader.next().transpose().unwrap() {
2174 num_records.push(rb.num_rows());
2175 }
2176
2177 assert_eq!(vec![5, 5, 2], num_records);
2178 }
2179
2180 #[test]
2181 fn test_timestamp_from_json_seconds() {
2182 let schema = Schema::new(vec![Field::new(
2183 "a",
2184 DataType::Timestamp(TimeUnit::Second, None),
2185 true,
2186 )]);
2187
2188 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2189 let batch = reader.next().unwrap().unwrap();
2190
2191 assert_eq!(1, batch.num_columns());
2192 assert_eq!(12, batch.num_rows());
2193
2194 let schema = reader.schema();
2195 let batch_schema = batch.schema();
2196 assert_eq!(schema, batch_schema);
2197
2198 let a = schema.column_with_name("a").unwrap();
2199 assert_eq!(
2200 &DataType::Timestamp(TimeUnit::Second, None),
2201 a.1.data_type()
2202 );
2203
2204 let aa = batch.column(a.0).as_primitive::<TimestampSecondType>();
2205 assert!(aa.is_valid(0));
2206 assert!(!aa.is_valid(1));
2207 assert!(!aa.is_valid(2));
2208 assert_eq!(1, aa.value(0));
2209 assert_eq!(1, aa.value(3));
2210 assert_eq!(5, aa.value(7));
2211 }
2212
2213 #[test]
2214 fn test_timestamp_from_json_milliseconds() {
2215 let schema = Schema::new(vec![Field::new(
2216 "a",
2217 DataType::Timestamp(TimeUnit::Millisecond, None),
2218 true,
2219 )]);
2220
2221 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2222 let batch = reader.next().unwrap().unwrap();
2223
2224 assert_eq!(1, batch.num_columns());
2225 assert_eq!(12, batch.num_rows());
2226
2227 let schema = reader.schema();
2228 let batch_schema = batch.schema();
2229 assert_eq!(schema, batch_schema);
2230
2231 let a = schema.column_with_name("a").unwrap();
2232 assert_eq!(
2233 &DataType::Timestamp(TimeUnit::Millisecond, None),
2234 a.1.data_type()
2235 );
2236
2237 let aa = batch.column(a.0).as_primitive::<TimestampMillisecondType>();
2238 assert!(aa.is_valid(0));
2239 assert!(!aa.is_valid(1));
2240 assert!(!aa.is_valid(2));
2241 assert_eq!(1, aa.value(0));
2242 assert_eq!(1, aa.value(3));
2243 assert_eq!(5, aa.value(7));
2244 }
2245
2246 #[test]
2247 fn test_date_from_json_milliseconds() {
2248 let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
2249
2250 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2251 let batch = reader.next().unwrap().unwrap();
2252
2253 assert_eq!(1, batch.num_columns());
2254 assert_eq!(12, batch.num_rows());
2255
2256 let schema = reader.schema();
2257 let batch_schema = batch.schema();
2258 assert_eq!(schema, batch_schema);
2259
2260 let a = schema.column_with_name("a").unwrap();
2261 assert_eq!(&DataType::Date64, a.1.data_type());
2262
2263 let aa = batch.column(a.0).as_primitive::<Date64Type>();
2264 assert!(aa.is_valid(0));
2265 assert!(!aa.is_valid(1));
2266 assert!(!aa.is_valid(2));
2267 assert_eq!(1, aa.value(0));
2268 assert_eq!(1, aa.value(3));
2269 assert_eq!(5, aa.value(7));
2270 }
2271
2272 #[test]
2273 fn test_time_from_json_nanoseconds() {
2274 let schema = Schema::new(vec![Field::new(
2275 "a",
2276 DataType::Time64(TimeUnit::Nanosecond),
2277 true,
2278 )]);
2279
2280 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2281 let batch = reader.next().unwrap().unwrap();
2282
2283 assert_eq!(1, batch.num_columns());
2284 assert_eq!(12, batch.num_rows());
2285
2286 let schema = reader.schema();
2287 let batch_schema = batch.schema();
2288 assert_eq!(schema, batch_schema);
2289
2290 let a = schema.column_with_name("a").unwrap();
2291 assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
2292
2293 let aa = batch.column(a.0).as_primitive::<Time64NanosecondType>();
2294 assert!(aa.is_valid(0));
2295 assert!(!aa.is_valid(1));
2296 assert!(!aa.is_valid(2));
2297 assert_eq!(1, aa.value(0));
2298 assert_eq!(1, aa.value(3));
2299 assert_eq!(5, aa.value(7));
2300 }
2301
2302 #[test]
2303 fn test_json_iterator() {
2304 let file = File::open("test/data/basic.json").unwrap();
2305 let mut reader = BufReader::new(file);
2306 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2307 reader.rewind().unwrap();
2308
2309 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2310 let reader = builder.build(reader).unwrap();
2311 let schema = reader.schema();
2312 let (col_a_index, _) = schema.column_with_name("a").unwrap();
2313
2314 let mut sum_num_rows = 0;
2315 let mut num_batches = 0;
2316 let mut sum_a = 0;
2317 for batch in reader {
2318 let batch = batch.unwrap();
2319 assert_eq!(8, batch.num_columns());
2320 sum_num_rows += batch.num_rows();
2321 num_batches += 1;
2322 let batch_schema = batch.schema();
2323 assert_eq!(schema, batch_schema);
2324 let a_array = batch.column(col_a_index).as_primitive::<Int64Type>();
2325 sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
2326 }
2327 assert_eq!(12, sum_num_rows);
2328 assert_eq!(3, num_batches);
2329 assert_eq!(100000000000011, sum_a);
2330 }
2331
2332 #[test]
2333 fn test_decoder_error() {
2334 let schema = Arc::new(Schema::new(vec![Field::new_struct(
2335 "a",
2336 vec![Field::new("child", DataType::Int32, false)],
2337 true,
2338 )]));
2339
2340 let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
2341 let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
2342 assert!(decoder.tape_decoder.has_partial_row());
2343 assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2344 let _ = decoder.flush().unwrap_err();
2345 assert!(decoder.tape_decoder.has_partial_row());
2346 assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2347
2348 let parse_err = |s: &str| {
2349 ReaderBuilder::new(schema.clone())
2350 .build(Cursor::new(s.as_bytes()))
2351 .unwrap()
2352 .next()
2353 .unwrap()
2354 .unwrap_err()
2355 .to_string()
2356 };
2357
2358 let err = parse_err(r#"{"a": 123}"#);
2359 assert_eq!(
2360 err,
2361 "Json error: whilst decoding field 'a': expected { got 123"
2362 );
2363
2364 let err = parse_err(r#"{"a": ["bar"]}"#);
2365 assert_eq!(
2366 err,
2367 r#"Json error: whilst decoding field 'a': expected { got ["bar"]"#
2368 );
2369
2370 let err = parse_err(r#"{"a": []}"#);
2371 assert_eq!(
2372 err,
2373 "Json error: whilst decoding field 'a': expected { got []"
2374 );
2375
2376 let err = parse_err(r#"{"a": [{"child": 234}]}"#);
2377 assert_eq!(
2378 err,
2379 r#"Json error: whilst decoding field 'a': expected { got [{"child": 234}]"#
2380 );
2381
2382 let err = parse_err(r#"{"a": [{"child": {"foo": [{"foo": ["bar"]}]}}]}"#);
2383 assert_eq!(
2384 err,
2385 r#"Json error: whilst decoding field 'a': expected { got [{"child": {"foo": [{"foo": ["bar"]}]}}]"#
2386 );
2387
2388 let err = parse_err(r#"{"a": true}"#);
2389 assert_eq!(
2390 err,
2391 "Json error: whilst decoding field 'a': expected { got true"
2392 );
2393
2394 let err = parse_err(r#"{"a": false}"#);
2395 assert_eq!(
2396 err,
2397 "Json error: whilst decoding field 'a': expected { got false"
2398 );
2399
2400 let err = parse_err(r#"{"a": "foo"}"#);
2401 assert_eq!(
2402 err,
2403 "Json error: whilst decoding field 'a': expected { got \"foo\""
2404 );
2405
2406 let err = parse_err(r#"{"a": {"child": false}}"#);
2407 assert_eq!(
2408 err,
2409 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got false"
2410 );
2411
2412 let err = parse_err(r#"{"a": {"child": []}}"#);
2413 assert_eq!(
2414 err,
2415 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got []"
2416 );
2417
2418 let err = parse_err(r#"{"a": {"child": [123]}}"#);
2419 assert_eq!(
2420 err,
2421 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123]"
2422 );
2423
2424 let err = parse_err(r#"{"a": {"child": [123, 3465346]}}"#);
2425 assert_eq!(
2426 err,
2427 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123, 3465346]"
2428 );
2429 }
2430
2431 #[test]
2432 fn test_serialize_timestamp() {
2433 let json = vec![
2434 json!({"timestamp": 1681319393}),
2435 json!({"timestamp": "1970-01-01T00:00:00+02:00"}),
2436 ];
2437 let schema = Schema::new(vec![Field::new(
2438 "timestamp",
2439 DataType::Timestamp(TimeUnit::Second, None),
2440 true,
2441 )]);
2442 let mut decoder = ReaderBuilder::new(Arc::new(schema))
2443 .build_decoder()
2444 .unwrap();
2445 decoder.serialize(&json).unwrap();
2446 let batch = decoder.flush().unwrap().unwrap();
2447 assert_eq!(batch.num_rows(), 2);
2448 assert_eq!(batch.num_columns(), 1);
2449 let values = batch.column(0).as_primitive::<TimestampSecondType>();
2450 assert_eq!(values.values(), &[1681319393, -7200]);
2451 }
2452
2453 #[test]
2454 fn test_serialize_decimal() {
2455 let json = vec![
2456 json!({"decimal": 1.234}),
2457 json!({"decimal": "1.234"}),
2458 json!({"decimal": 1234}),
2459 json!({"decimal": "1234"}),
2460 ];
2461 let schema = Schema::new(vec![Field::new(
2462 "decimal",
2463 DataType::Decimal128(10, 3),
2464 true,
2465 )]);
2466 let mut decoder = ReaderBuilder::new(Arc::new(schema))
2467 .build_decoder()
2468 .unwrap();
2469 decoder.serialize(&json).unwrap();
2470 let batch = decoder.flush().unwrap().unwrap();
2471 assert_eq!(batch.num_rows(), 4);
2472 assert_eq!(batch.num_columns(), 1);
2473 let values = batch.column(0).as_primitive::<Decimal128Type>();
2474 assert_eq!(values.values(), &[1234, 1234, 1234000, 1234000]);
2475 }
2476
2477 #[test]
2478 fn test_serde_field() {
2479 let field = Field::new("int", DataType::Int32, true);
2480 let mut decoder = ReaderBuilder::new_with_field(field)
2481 .build_decoder()
2482 .unwrap();
2483 decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
2484 let b = decoder.flush().unwrap().unwrap();
2485 let values = b.column(0).as_primitive::<Int32Type>().values();
2486 assert_eq!(values, &[1, 2, 3, 4]);
2487 }
2488
2489 #[test]
2490 fn test_serde_large_numbers() {
2491 let field = Field::new("int", DataType::Int64, true);
2492 let mut decoder = ReaderBuilder::new_with_field(field)
2493 .build_decoder()
2494 .unwrap();
2495
2496 decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2497 let b = decoder.flush().unwrap().unwrap();
2498 let values = b.column(0).as_primitive::<Int64Type>().values();
2499 assert_eq!(values, &[1699148028689, 2, 3, 4]);
2500
2501 let field = Field::new(
2502 "int",
2503 DataType::Timestamp(TimeUnit::Microsecond, None),
2504 true,
2505 );
2506 let mut decoder = ReaderBuilder::new_with_field(field)
2507 .build_decoder()
2508 .unwrap();
2509
2510 decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2511 let b = decoder.flush().unwrap().unwrap();
2512 let values = b
2513 .column(0)
2514 .as_primitive::<TimestampMicrosecondType>()
2515 .values();
2516 assert_eq!(values, &[1699148028689, 2, 3, 4]);
2517 }
2518
2519 #[test]
2520 fn test_coercing_primitive_into_string_decoder() {
2521 let buf = &format!(
2522 r#"[{{"a": 1, "b": "A", "c": "T"}}, {{"a": 2, "b": "BB", "c": "F"}}, {{"a": {}, "b": 123, "c": false}}, {{"a": {}, "b": 789, "c": true}}]"#,
2523 (i32::MAX as i64 + 10),
2524 i64::MAX - 10
2525 );
2526 let schema = Schema::new(vec![
2527 Field::new("a", DataType::Float64, true),
2528 Field::new("b", DataType::Utf8, true),
2529 Field::new("c", DataType::Utf8, true),
2530 ]);
2531 let json_array: Vec<serde_json::Value> = serde_json::from_str(buf).unwrap();
2532 let schema_ref = Arc::new(schema);
2533
2534 let reader = ReaderBuilder::new(schema_ref.clone()).with_coerce_primitive(true);
2536 let mut decoder = reader.build_decoder().unwrap();
2537 decoder.serialize(json_array.as_slice()).unwrap();
2538 let batch = decoder.flush().unwrap().unwrap();
2539 assert_eq!(
2540 batch,
2541 RecordBatch::try_new(
2542 schema_ref,
2543 vec![
2544 Arc::new(Float64Array::from(vec![
2545 1.0,
2546 2.0,
2547 (i32::MAX as i64 + 10) as f64,
2548 (i64::MAX - 10) as f64
2549 ])),
2550 Arc::new(StringArray::from(vec!["A", "BB", "123", "789"])),
2551 Arc::new(StringArray::from(vec!["T", "F", "false", "true"])),
2552 ]
2553 )
2554 .unwrap()
2555 );
2556 }
2557
2558 fn _parse_structs(
2563 row: &str,
2564 struct_mode: StructMode,
2565 fields: Fields,
2566 as_struct: bool,
2567 ) -> Result<RecordBatch, ArrowError> {
2568 let builder = if as_struct {
2569 ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
2570 } else {
2571 ReaderBuilder::new(Arc::new(Schema::new(fields)))
2572 };
2573 builder
2574 .with_struct_mode(struct_mode)
2575 .build(Cursor::new(row.as_bytes()))
2576 .unwrap()
2577 .next()
2578 .unwrap()
2579 }
2580
2581 #[test]
2582 fn test_struct_decoding_list_length() {
2583 use arrow_array::array;
2584
2585 let row = "[1, 2]";
2586
2587 let mut fields = vec![Field::new("a", DataType::Int32, true)];
2588 let too_few_fields = Fields::from(fields.clone());
2589 fields.push(Field::new("b", DataType::Int32, true));
2590 let correct_fields = Fields::from(fields.clone());
2591 fields.push(Field::new("c", DataType::Int32, true));
2592 let too_many_fields = Fields::from(fields.clone());
2593
2594 let parse = |fields: Fields, as_struct: bool| {
2595 _parse_structs(row, StructMode::ListOnly, fields, as_struct)
2596 };
2597
2598 let expected_row = StructArray::new(
2599 correct_fields.clone(),
2600 vec![
2601 Arc::new(array::Int32Array::from(vec![1])),
2602 Arc::new(array::Int32Array::from(vec![2])),
2603 ],
2604 None,
2605 );
2606 let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);
2607
2608 assert_eq!(
2609 parse(too_few_fields.clone(), true).unwrap_err().to_string(),
2610 "Json error: found extra columns for 1 fields".to_string()
2611 );
2612 assert_eq!(
2613 parse(too_few_fields, false).unwrap_err().to_string(),
2614 "Json error: found extra columns for 1 fields".to_string()
2615 );
2616 assert_eq!(
2617 parse(correct_fields.clone(), true).unwrap(),
2618 RecordBatch::try_new(
2619 Arc::new(Schema::new(vec![row_field])),
2620 vec![Arc::new(expected_row.clone())]
2621 )
2622 .unwrap()
2623 );
2624 assert_eq!(
2625 parse(correct_fields, false).unwrap(),
2626 RecordBatch::from(expected_row)
2627 );
2628 assert_eq!(
2629 parse(too_many_fields.clone(), true)
2630 .unwrap_err()
2631 .to_string(),
2632 "Json error: found 2 columns for 3 fields".to_string()
2633 );
2634 assert_eq!(
2635 parse(too_many_fields, false).unwrap_err().to_string(),
2636 "Json error: found 2 columns for 3 fields".to_string()
2637 );
2638 }
2639
2640 #[test]
2641 fn test_struct_decoding() {
2642 use arrow_array::builder;
2643
2644 let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
2645 let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
2646 let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;
2647
2648 let struct_fields = Fields::from(vec![
2649 Field::new("b", DataType::new_list(DataType::Int32, true), true),
2650 Field::new_map(
2651 "c",
2652 "entries",
2653 Field::new("keys", DataType::Utf8, false),
2654 Field::new("values", DataType::Int32, true),
2655 false,
2656 false,
2657 ),
2658 ]);
2659
2660 let list_array =
2661 ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);
2662
2663 let map_array = {
2664 let mut map_builder = builder::MapBuilder::new(
2665 None,
2666 builder::StringBuilder::new(),
2667 builder::Int32Builder::new(),
2668 );
2669 map_builder.keys().append_value("d");
2670 map_builder.values().append_value(3);
2671 map_builder.append(true).unwrap();
2672 map_builder.finish()
2673 };
2674
2675 let struct_array = StructArray::new(
2676 struct_fields.clone(),
2677 vec![Arc::new(list_array), Arc::new(map_array)],
2678 None,
2679 );
2680
2681 let fields = Fields::from(vec![Field::new("a", DataType::Struct(struct_fields), true)]);
2682 let schema = Arc::new(Schema::new(fields.clone()));
2683 let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();
2684
2685 let parse = |row: &str, struct_mode: StructMode| {
2686 _parse_structs(row, struct_mode, fields.clone(), false)
2687 };
2688
2689 assert_eq!(
2690 parse(nested_object_json, StructMode::ObjectOnly).unwrap(),
2691 expected
2692 );
2693 assert_eq!(
2694 parse(nested_list_json, StructMode::ObjectOnly)
2695 .unwrap_err()
2696 .to_string(),
2697 "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
2698 );
2699 assert_eq!(
2700 parse(nested_mixed_json, StructMode::ObjectOnly)
2701 .unwrap_err()
2702 .to_string(),
2703 "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
2704 );
2705
2706 assert_eq!(
2707 parse(nested_list_json, StructMode::ListOnly).unwrap(),
2708 expected
2709 );
2710 assert_eq!(
2711 parse(nested_object_json, StructMode::ListOnly)
2712 .unwrap_err()
2713 .to_string(),
2714 "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
2715 );
2716 assert_eq!(
2717 parse(nested_mixed_json, StructMode::ListOnly)
2718 .unwrap_err()
2719 .to_string(),
2720 "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
2721 );
2722 }
2723
2724 #[test]
2730 fn test_struct_decoding_empty_list() {
2731 let int_field = Field::new("a", DataType::Int32, true);
2732 let struct_field = Field::new(
2733 "r",
2734 DataType::Struct(Fields::from(vec![int_field.clone()])),
2735 true,
2736 );
2737
2738 let parse = |row: &str, as_struct: bool, field: Field| {
2739 _parse_structs(
2740 row,
2741 StructMode::ListOnly,
2742 Fields::from(vec![field]),
2743 as_struct,
2744 )
2745 };
2746
2747 assert_eq!(
2749 parse("[]", true, struct_field.clone())
2750 .unwrap_err()
2751 .to_string(),
2752 "Json error: found 0 columns for 1 fields".to_owned()
2753 );
2754 assert_eq!(
2755 parse("[]", false, int_field.clone())
2756 .unwrap_err()
2757 .to_string(),
2758 "Json error: found 0 columns for 1 fields".to_owned()
2759 );
2760 assert_eq!(
2761 parse("[]", false, struct_field.clone())
2762 .unwrap_err()
2763 .to_string(),
2764 "Json error: found 0 columns for 1 fields".to_owned()
2765 );
2766 assert_eq!(
2767 parse("[[]]", false, struct_field.clone())
2768 .unwrap_err()
2769 .to_string(),
2770 "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
2771 );
2772 }
2773
2774 #[test]
2775 fn test_decode_list_struct_with_wrong_types() {
2776 let int_field = Field::new("a", DataType::Int32, true);
2777 let struct_field = Field::new(
2778 "r",
2779 DataType::Struct(Fields::from(vec![int_field.clone()])),
2780 true,
2781 );
2782
2783 let parse = |row: &str, as_struct: bool, field: Field| {
2784 _parse_structs(
2785 row,
2786 StructMode::ListOnly,
2787 Fields::from(vec![field]),
2788 as_struct,
2789 )
2790 };
2791
2792 assert_eq!(
2794 parse(r#"[["a"]]"#, false, struct_field.clone())
2795 .unwrap_err()
2796 .to_string(),
2797 "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2798 );
2799 assert_eq!(
2800 parse(r#"[["a"]]"#, true, struct_field.clone())
2801 .unwrap_err()
2802 .to_string(),
2803 "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2804 );
2805 assert_eq!(
2806 parse(r#"["a"]"#, true, int_field.clone())
2807 .unwrap_err()
2808 .to_string(),
2809 "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2810 );
2811 assert_eq!(
2812 parse(r#"["a"]"#, false, int_field.clone())
2813 .unwrap_err()
2814 .to_string(),
2815 "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2816 );
2817 }
2818}