1use crate::StructMode;
137use crate::reader::binary_array::{
138 BinaryArrayDecoder, BinaryViewDecoder, FixedSizeBinaryArrayDecoder,
139};
140use std::borrow::Cow;
141use std::io::BufRead;
142use std::sync::Arc;
143
144use chrono::Utc;
145use serde_core::Serialize;
146
147use arrow_array::timezone::Tz;
148use arrow_array::types::*;
149use arrow_array::{RecordBatch, RecordBatchReader, StructArray, downcast_integer, make_array};
150use arrow_data::ArrayData;
151use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
152pub use schema::*;
153
154use crate::reader::boolean_array::BooleanArrayDecoder;
155use crate::reader::decimal_array::DecimalArrayDecoder;
156use crate::reader::list_array::ListArrayDecoder;
157use crate::reader::map_array::MapArrayDecoder;
158use crate::reader::null_array::NullArrayDecoder;
159use crate::reader::primitive_array::PrimitiveArrayDecoder;
160use crate::reader::string_array::StringArrayDecoder;
161use crate::reader::string_view_array::StringViewArrayDecoder;
162use crate::reader::struct_array::StructArrayDecoder;
163use crate::reader::tape::{Tape, TapeDecoder};
164use crate::reader::timestamp_array::TimestampArrayDecoder;
165
166mod binary_array;
167mod boolean_array;
168mod decimal_array;
169mod list_array;
170mod map_array;
171mod null_array;
172mod primitive_array;
173mod schema;
174mod serializer;
175mod string_array;
176mod string_view_array;
177mod struct_array;
178mod tape;
179mod timestamp_array;
180
181pub struct ReaderBuilder {
183 batch_size: usize,
184 coerce_primitive: bool,
185 strict_mode: bool,
186 is_field: bool,
187 struct_mode: StructMode,
188
189 schema: SchemaRef,
190}
191
192impl ReaderBuilder {
193 pub fn new(schema: SchemaRef) -> Self {
202 Self {
203 batch_size: 1024,
204 coerce_primitive: false,
205 strict_mode: false,
206 is_field: false,
207 struct_mode: Default::default(),
208 schema,
209 }
210 }
211
212 pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
243 Self {
244 batch_size: 1024,
245 coerce_primitive: false,
246 strict_mode: false,
247 is_field: true,
248 struct_mode: Default::default(),
249 schema: Arc::new(Schema::new([field.into()])),
250 }
251 }
252
253 pub fn with_batch_size(self, batch_size: usize) -> Self {
255 Self { batch_size, ..self }
256 }
257
258 pub fn with_coerce_primitive(self, coerce_primitive: bool) -> Self {
261 Self {
262 coerce_primitive,
263 ..self
264 }
265 }
266
267 pub fn with_strict_mode(self, strict_mode: bool) -> Self {
273 Self {
274 strict_mode,
275 ..self
276 }
277 }
278
279 pub fn with_struct_mode(self, struct_mode: StructMode) -> Self {
283 Self {
284 struct_mode,
285 ..self
286 }
287 }
288
289 pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
291 Ok(Reader {
292 reader,
293 decoder: self.build_decoder()?,
294 })
295 }
296
297 pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
299 let (data_type, nullable) = if self.is_field {
300 let field = &self.schema.fields[0];
301 let data_type = Cow::Borrowed(field.data_type());
302 (data_type, field.is_nullable())
303 } else {
304 let data_type = Cow::Owned(DataType::Struct(self.schema.fields.clone()));
305 (data_type, false)
306 };
307
308 let ctx = DecoderContext {
309 coerce_primitive: self.coerce_primitive,
310 strict_mode: self.strict_mode,
311 struct_mode: self.struct_mode,
312 };
313 let decoder = ctx.make_decoder(data_type.as_ref(), nullable)?;
314
315 let num_fields = self.schema.flattened_fields().len();
316
317 Ok(Decoder {
318 decoder,
319 is_field: self.is_field,
320 tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
321 batch_size: self.batch_size,
322 schema: self.schema,
323 })
324 }
325}
326
327pub struct Reader<R> {
331 reader: R,
332 decoder: Decoder,
333}
334
335impl<R> std::fmt::Debug for Reader<R> {
336 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
337 f.debug_struct("Reader")
338 .field("decoder", &self.decoder)
339 .finish()
340 }
341}
342
343impl<R: BufRead> Reader<R> {
344 fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
346 loop {
347 let buf = self.reader.fill_buf()?;
348 if buf.is_empty() {
349 break;
350 }
351 let read = buf.len();
352
353 let decoded = self.decoder.decode(buf)?;
354 self.reader.consume(decoded);
355 if decoded != read {
356 break;
357 }
358 }
359 self.decoder.flush()
360 }
361}
362
363impl<R: BufRead> Iterator for Reader<R> {
364 type Item = Result<RecordBatch, ArrowError>;
365
366 fn next(&mut self) -> Option<Self::Item> {
367 self.read().transpose()
368 }
369}
370
371impl<R: BufRead> RecordBatchReader for Reader<R> {
372 fn schema(&self) -> SchemaRef {
373 self.decoder.schema.clone()
374 }
375}
376
377pub struct Decoder {
418 tape_decoder: TapeDecoder,
419 decoder: Box<dyn ArrayDecoder>,
420 batch_size: usize,
421 is_field: bool,
422 schema: SchemaRef,
423}
424
425impl std::fmt::Debug for Decoder {
426 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
427 f.debug_struct("Decoder")
428 .field("schema", &self.schema)
429 .field("batch_size", &self.batch_size)
430 .finish()
431 }
432}
433
434impl Decoder {
435 pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
444 self.tape_decoder.decode(buf)
445 }
446
447 pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
624 self.tape_decoder.serialize(rows)
625 }
626
627 pub fn has_partial_record(&self) -> bool {
629 self.tape_decoder.has_partial_row()
630 }
631
632 pub fn len(&self) -> usize {
634 self.tape_decoder.num_buffered_rows()
635 }
636
637 pub fn is_empty(&self) -> bool {
639 self.len() == 0
640 }
641
642 pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
649 let tape = self.tape_decoder.finish()?;
650
651 if tape.num_rows() == 0 {
652 return Ok(None);
653 }
654
655 let mut next_object = 1;
657 let pos: Vec<_> = (0..tape.num_rows())
658 .map(|_| {
659 let next = tape.next(next_object, "row").unwrap();
660 std::mem::replace(&mut next_object, next)
661 })
662 .collect();
663
664 let decoded = self.decoder.decode(&tape, &pos)?;
665 self.tape_decoder.clear();
666
667 let batch = match self.is_field {
668 true => RecordBatch::try_new(self.schema.clone(), vec![make_array(decoded)])?,
669 false => {
670 RecordBatch::from(StructArray::from(decoded)).with_schema(self.schema.clone())?
671 }
672 };
673
674 Ok(Some(batch))
675 }
676}
677
678trait ArrayDecoder: Send {
679 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError>;
681}
682
683pub struct DecoderContext {
688 coerce_primitive: bool,
690 strict_mode: bool,
692 struct_mode: StructMode,
694}
695
696impl DecoderContext {
697 pub fn coerce_primitive(&self) -> bool {
699 self.coerce_primitive
700 }
701
702 pub fn strict_mode(&self) -> bool {
704 self.strict_mode
705 }
706
707 pub fn struct_mode(&self) -> StructMode {
709 self.struct_mode
710 }
711
712 fn make_decoder(
717 &self,
718 data_type: &DataType,
719 is_nullable: bool,
720 ) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
721 make_decoder(self, data_type, is_nullable)
722 }
723}
724
725macro_rules! primitive_decoder {
726 ($t:ty, $data_type:expr) => {
727 Ok(Box::new(PrimitiveArrayDecoder::<$t>::new($data_type)))
728 };
729}
730
731fn make_decoder(
732 ctx: &DecoderContext,
733 data_type: &DataType,
734 is_nullable: bool,
735) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
736 let coerce_primitive = ctx.coerce_primitive();
737 downcast_integer! {
738 *data_type => (primitive_decoder, data_type),
739 DataType::Null => Ok(Box::<NullArrayDecoder>::default()),
740 DataType::Float16 => primitive_decoder!(Float16Type, data_type),
741 DataType::Float32 => primitive_decoder!(Float32Type, data_type),
742 DataType::Float64 => primitive_decoder!(Float64Type, data_type),
743 DataType::Timestamp(TimeUnit::Second, None) => {
744 Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, Utc)))
745 },
746 DataType::Timestamp(TimeUnit::Millisecond, None) => {
747 Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, Utc)))
748 },
749 DataType::Timestamp(TimeUnit::Microsecond, None) => {
750 Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, Utc)))
751 },
752 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
753 Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, Utc)))
754 },
755 DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
756 let tz: Tz = tz.parse()?;
757 Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, tz)))
758 },
759 DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
760 let tz: Tz = tz.parse()?;
761 Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, tz)))
762 },
763 DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
764 let tz: Tz = tz.parse()?;
765 Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, tz)))
766 },
767 DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
768 let tz: Tz = tz.parse()?;
769 Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, tz)))
770 },
771 DataType::Date32 => primitive_decoder!(Date32Type, data_type),
772 DataType::Date64 => primitive_decoder!(Date64Type, data_type),
773 DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
774 DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type),
775 DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type),
776 DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type),
777 DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type),
778 DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type),
779 DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type),
780 DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type),
781 DataType::Decimal32(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal32Type>::new(p, s))),
782 DataType::Decimal64(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal64Type>::new(p, s))),
783 DataType::Decimal128(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal128Type>::new(p, s))),
784 DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal256Type>::new(p, s))),
785 DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
786 DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
787 DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))),
788 DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
789 DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
790 DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
791 DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(ctx, data_type, is_nullable)?)),
792 DataType::Binary => Ok(Box::new(BinaryArrayDecoder::<i32>::default())),
793 DataType::LargeBinary => Ok(Box::new(BinaryArrayDecoder::<i64>::default())),
794 DataType::FixedSizeBinary(len) => Ok(Box::new(FixedSizeBinaryArrayDecoder::new(len))),
795 DataType::BinaryView => Ok(Box::new(BinaryViewDecoder::default())),
796 DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(ctx, data_type, is_nullable)?)),
797 _ => Err(ArrowError::NotYetImplemented(format!("Support for {data_type} in JSON reader")))
798 }
799}
800
801#[cfg(test)]
802mod tests {
803 use serde_json::json;
804 use std::fs::File;
805 use std::io::{BufReader, Cursor, Seek};
806
807 use arrow_array::cast::AsArray;
808 use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray, StringViewArray};
809 use arrow_buffer::{ArrowNativeType, Buffer};
810 use arrow_cast::display::{ArrayFormatter, FormatOptions};
811 use arrow_data::ArrayDataBuilder;
812 use arrow_schema::{Field, Fields};
813
814 use super::*;
815
816 fn do_read(
817 buf: &str,
818 batch_size: usize,
819 coerce_primitive: bool,
820 strict_mode: bool,
821 schema: SchemaRef,
822 ) -> Vec<RecordBatch> {
823 let mut unbuffered = vec![];
824
825 for batch_size in [1, 3, 100, batch_size] {
827 unbuffered = ReaderBuilder::new(schema.clone())
828 .with_batch_size(batch_size)
829 .with_coerce_primitive(coerce_primitive)
830 .build(Cursor::new(buf.as_bytes()))
831 .unwrap()
832 .collect::<Result<Vec<_>, _>>()
833 .unwrap();
834
835 for b in unbuffered.iter().take(unbuffered.len() - 1) {
836 assert_eq!(b.num_rows(), batch_size)
837 }
838
839 for b in [1, 3, 5] {
841 let buffered = ReaderBuilder::new(schema.clone())
842 .with_batch_size(batch_size)
843 .with_coerce_primitive(coerce_primitive)
844 .with_strict_mode(strict_mode)
845 .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
846 .unwrap()
847 .collect::<Result<Vec<_>, _>>()
848 .unwrap();
849 assert_eq!(unbuffered, buffered);
850 }
851 }
852
853 unbuffered
854 }
855
856 #[test]
857 fn test_basic() {
858 let buf = r#"
859 {"a": 1, "b": 2, "c": true, "d": 1}
860 {"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254}
861
862 {"b": 6, "a": 2.0, "d": 45}
863 {"b": "5", "a": 2}
864 {"b": 4e0}
865 {"b": 7, "a": null}
866 "#;
867
868 let schema = Arc::new(Schema::new(vec![
869 Field::new("a", DataType::Int64, true),
870 Field::new("b", DataType::Int32, true),
871 Field::new("c", DataType::Boolean, true),
872 Field::new("d", DataType::Date32, true),
873 Field::new("e", DataType::Date64, true),
874 ]));
875
876 let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
877 assert!(decoder.is_empty());
878 assert_eq!(decoder.len(), 0);
879 assert!(!decoder.has_partial_record());
880 assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
881 assert!(!decoder.is_empty());
882 assert_eq!(decoder.len(), 6);
883 assert!(!decoder.has_partial_record());
884 let batch = decoder.flush().unwrap().unwrap();
885 assert_eq!(batch.num_rows(), 6);
886 assert!(decoder.is_empty());
887 assert_eq!(decoder.len(), 0);
888 assert!(!decoder.has_partial_record());
889
890 let batches = do_read(buf, 1024, false, false, schema);
891 assert_eq!(batches.len(), 1);
892
893 let col1 = batches[0].column(0).as_primitive::<Int64Type>();
894 assert_eq!(col1.null_count(), 2);
895 assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
896 assert!(col1.is_null(4));
897 assert!(col1.is_null(5));
898
899 let col2 = batches[0].column(1).as_primitive::<Int32Type>();
900 assert_eq!(col2.null_count(), 0);
901 assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
902
903 let col3 = batches[0].column(2).as_boolean();
904 assert_eq!(col3.null_count(), 4);
905 assert!(col3.value(0));
906 assert!(!col3.is_null(0));
907 assert!(!col3.value(1));
908 assert!(!col3.is_null(1));
909
910 let col4 = batches[0].column(3).as_primitive::<Date32Type>();
911 assert_eq!(col4.null_count(), 3);
912 assert!(col4.is_null(3));
913 assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]);
914
915 let col5 = batches[0].column(4).as_primitive::<Date64Type>();
916 assert_eq!(col5.null_count(), 5);
917 assert!(col5.is_null(0));
918 assert!(col5.is_null(2));
919 assert!(col5.is_null(3));
920 assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]);
921 }
922
923 #[test]
924 fn test_string() {
925 let buf = r#"
926 {"a": "1", "b": "2"}
927 {"a": "hello", "b": "shoo"}
928 {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
929
930 {"b": null}
931 {"b": "", "a": null}
932
933 "#;
934 let schema = Arc::new(Schema::new(vec![
935 Field::new("a", DataType::Utf8, true),
936 Field::new("b", DataType::LargeUtf8, true),
937 ]));
938
939 let batches = do_read(buf, 1024, false, false, schema);
940 assert_eq!(batches.len(), 1);
941
942 let col1 = batches[0].column(0).as_string::<i32>();
943 assert_eq!(col1.null_count(), 2);
944 assert_eq!(col1.value(0), "1");
945 assert_eq!(col1.value(1), "hello");
946 assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
947 assert!(col1.is_null(3));
948 assert!(col1.is_null(4));
949
950 let col2 = batches[0].column(1).as_string::<i64>();
951 assert_eq!(col2.null_count(), 1);
952 assert_eq!(col2.value(0), "2");
953 assert_eq!(col2.value(1), "shoo");
954 assert_eq!(col2.value(2), "\t😁foo");
955 assert!(col2.is_null(3));
956 assert_eq!(col2.value(4), "");
957 }
958
959 #[test]
960 fn test_long_string_view_allocation() {
961 let expected_capacity: usize = 41;
971
972 let buf = r#"
973 {"a": "short", "b": "dummy"}
974 {"a": "this is definitely long", "b": "dummy"}
975 {"a": "hello", "b": "dummy"}
976 {"a": "\nfoobar😀asfgÿ", "b": "dummy"}
977 "#;
978
979 let schema = Arc::new(Schema::new(vec![
980 Field::new("a", DataType::Utf8View, true),
981 Field::new("b", DataType::LargeUtf8, true),
982 ]));
983
984 let batches = do_read(buf, 1024, false, false, schema);
985 assert_eq!(batches.len(), 1, "Expected one record batch");
986
987 let col_a = batches[0].column(0);
989 let string_view_array = col_a
990 .as_any()
991 .downcast_ref::<StringViewArray>()
992 .expect("Column should be a StringViewArray");
993
994 let data_buffer = string_view_array.to_data().buffers()[0].len();
997
998 assert!(
1001 data_buffer >= expected_capacity,
1002 "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1003 );
1004
1005 assert_eq!(string_view_array.value(0), "short");
1007 assert_eq!(string_view_array.value(1), "this is definitely long");
1008 assert_eq!(string_view_array.value(2), "hello");
1009 assert_eq!(string_view_array.value(3), "\nfoobar😀asfgÿ");
1010 }
1011
1012 #[test]
1014 fn test_numeric_view_allocation() {
1015 let expected_capacity: usize = 33;
1023
1024 let buf = r#"
1025 {"n": 123456789}
1026 {"n": 1000000000000}
1027 {"n": 3.1415}
1028 {"n": 2.718281828459045}
1029 "#;
1030
1031 let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Utf8View, true)]));
1032
1033 let batches = do_read(buf, 1024, true, false, schema);
1034 assert_eq!(batches.len(), 1, "Expected one record batch");
1035
1036 let col_n = batches[0].column(0);
1037 let string_view_array = col_n
1038 .as_any()
1039 .downcast_ref::<StringViewArray>()
1040 .expect("Column should be a StringViewArray");
1041
1042 let data_buffer = string_view_array.to_data().buffers()[0].len();
1044 assert!(
1045 data_buffer >= expected_capacity,
1046 "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1047 );
1048
1049 assert_eq!(string_view_array.value(0), "123456789");
1052 assert_eq!(string_view_array.value(1), "1000000000000");
1053 assert_eq!(string_view_array.value(2), "3.1415");
1054 assert_eq!(string_view_array.value(3), "2.718281828459045");
1055 }
1056
1057 #[test]
1058 fn test_string_with_uft8view() {
1059 let buf = r#"
1060 {"a": "1", "b": "2"}
1061 {"a": "hello", "b": "shoo"}
1062 {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
1063
1064 {"b": null}
1065 {"b": "", "a": null}
1066
1067 "#;
1068 let schema = Arc::new(Schema::new(vec![
1069 Field::new("a", DataType::Utf8View, true),
1070 Field::new("b", DataType::LargeUtf8, true),
1071 ]));
1072
1073 let batches = do_read(buf, 1024, false, false, schema);
1074 assert_eq!(batches.len(), 1);
1075
1076 let col1 = batches[0].column(0).as_string_view();
1077 assert_eq!(col1.null_count(), 2);
1078 assert_eq!(col1.value(0), "1");
1079 assert_eq!(col1.value(1), "hello");
1080 assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1081 assert!(col1.is_null(3));
1082 assert!(col1.is_null(4));
1083 assert_eq!(col1.data_type(), &DataType::Utf8View);
1084
1085 let col2 = batches[0].column(1).as_string::<i64>();
1086 assert_eq!(col2.null_count(), 1);
1087 assert_eq!(col2.value(0), "2");
1088 assert_eq!(col2.value(1), "shoo");
1089 assert_eq!(col2.value(2), "\t😁foo");
1090 assert!(col2.is_null(3));
1091 assert_eq!(col2.value(4), "");
1092 }
1093
1094 #[test]
1095 fn test_complex() {
1096 let buf = r#"
1097 {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
1098 {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1099 {"list": null, "nested": {"a": null}}
1100 "#;
1101
1102 let schema = Arc::new(Schema::new(vec![
1103 Field::new_list("list", Field::new("element", DataType::Int32, false), true),
1104 Field::new_struct(
1105 "nested",
1106 vec![
1107 Field::new("a", DataType::Int32, true),
1108 Field::new("b", DataType::Int32, true),
1109 ],
1110 true,
1111 ),
1112 Field::new_struct(
1113 "nested_list",
1114 vec![Field::new_list(
1115 "list2",
1116 Field::new_struct(
1117 "element",
1118 vec![Field::new("c", DataType::Int32, false)],
1119 false,
1120 ),
1121 true,
1122 )],
1123 true,
1124 ),
1125 ]));
1126
1127 let batches = do_read(buf, 1024, false, false, schema);
1128 assert_eq!(batches.len(), 1);
1129
1130 let list = batches[0].column(0).as_list::<i32>();
1131 assert_eq!(list.len(), 3);
1132 assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
1133 assert_eq!(list.null_count(), 1);
1134 assert!(list.is_null(2));
1135 let list_values = list.values().as_primitive::<Int32Type>();
1136 assert_eq!(list_values.values(), &[5, 6]);
1137
1138 let nested = batches[0].column(1).as_struct();
1139 let a = nested.column(0).as_primitive::<Int32Type>();
1140 assert_eq!(list.null_count(), 1);
1141 assert_eq!(a.values(), &[1, 7, 0]);
1142 assert!(list.is_null(2));
1143
1144 let b = nested.column(1).as_primitive::<Int32Type>();
1145 assert_eq!(b.null_count(), 2);
1146 assert_eq!(b.len(), 3);
1147 assert_eq!(b.value(0), 2);
1148 assert!(b.is_null(1));
1149 assert!(b.is_null(2));
1150
1151 let nested_list = batches[0].column(2).as_struct();
1152 assert_eq!(nested_list.len(), 3);
1153 assert_eq!(nested_list.null_count(), 1);
1154 assert!(nested_list.is_null(2));
1155
1156 let list2 = nested_list.column(0).as_list::<i32>();
1157 assert_eq!(list2.len(), 3);
1158 assert_eq!(list2.null_count(), 1);
1159 assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
1160 assert!(list2.is_null(2));
1161
1162 let list2_values = list2.values().as_struct();
1163
1164 let c = list2_values.column(0).as_primitive::<Int32Type>();
1165 assert_eq!(c.values(), &[3, 4]);
1166 }
1167
1168 #[test]
1169 fn test_projection() {
1170 let buf = r#"
1171 {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
1172 {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1173 "#;
1174
1175 let schema = Arc::new(Schema::new(vec![
1176 Field::new_struct(
1177 "nested",
1178 vec![Field::new("a", DataType::Int32, false)],
1179 true,
1180 ),
1181 Field::new_struct(
1182 "nested_list",
1183 vec![Field::new_list(
1184 "list2",
1185 Field::new_struct(
1186 "element",
1187 vec![Field::new("d", DataType::Int32, true)],
1188 false,
1189 ),
1190 true,
1191 )],
1192 true,
1193 ),
1194 ]));
1195
1196 let batches = do_read(buf, 1024, false, false, schema);
1197 assert_eq!(batches.len(), 1);
1198
1199 let nested = batches[0].column(0).as_struct();
1200 assert_eq!(nested.num_columns(), 1);
1201 let a = nested.column(0).as_primitive::<Int32Type>();
1202 assert_eq!(a.null_count(), 0);
1203 assert_eq!(a.values(), &[1, 7]);
1204
1205 let nested_list = batches[0].column(1).as_struct();
1206 assert_eq!(nested_list.num_columns(), 1);
1207 assert_eq!(nested_list.null_count(), 0);
1208
1209 let list2 = nested_list.column(0).as_list::<i32>();
1210 assert_eq!(list2.value_offsets(), &[0, 2, 2]);
1211 assert_eq!(list2.null_count(), 0);
1212
1213 let child = list2.values().as_struct();
1214 assert_eq!(child.num_columns(), 1);
1215 assert_eq!(child.len(), 2);
1216 assert_eq!(child.null_count(), 0);
1217
1218 let c = child.column(0).as_primitive::<Int32Type>();
1219 assert_eq!(c.values(), &[5, 0]);
1220 assert_eq!(c.null_count(), 1);
1221 assert!(c.is_null(1));
1222 }
1223
1224 #[test]
1225 fn test_map() {
1226 let buf = r#"
1227 {"map": {"a": ["foo", null]}}
1228 {"map": {"a": [null], "b": []}}
1229 {"map": {"c": null, "a": ["baz"]}}
1230 "#;
1231 let map = Field::new_map(
1232 "map",
1233 "entries",
1234 Field::new("key", DataType::Utf8, false),
1235 Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
1236 false,
1237 true,
1238 );
1239
1240 let schema = Arc::new(Schema::new(vec![map]));
1241
1242 let batches = do_read(buf, 1024, false, false, schema);
1243 assert_eq!(batches.len(), 1);
1244
1245 let map = batches[0].column(0).as_map();
1246 let map_keys = map.keys().as_string::<i32>();
1247 let map_values = map.values().as_list::<i32>();
1248 assert_eq!(map.value_offsets(), &[0, 1, 3, 5]);
1249
1250 let k: Vec<_> = map_keys.iter().flatten().collect();
1251 assert_eq!(&k, &["a", "a", "b", "c", "a"]);
1252
1253 let list_values = map_values.values().as_string::<i32>();
1254 let lv: Vec<_> = list_values.iter().collect();
1255 assert_eq!(&lv, &[Some("foo"), None, None, Some("baz")]);
1256 assert_eq!(map_values.value_offsets(), &[0, 2, 3, 3, 3, 4]);
1257 assert_eq!(map_values.null_count(), 1);
1258 assert!(map_values.is_null(3));
1259
1260 let options = FormatOptions::default().with_null("null");
1261 let formatter = ArrayFormatter::try_new(map, &options).unwrap();
1262 assert_eq!(formatter.value(0).to_string(), "{a: [foo, null]}");
1263 assert_eq!(formatter.value(1).to_string(), "{a: [null], b: []}");
1264 assert_eq!(formatter.value(2).to_string(), "{c: null, a: [baz]}");
1265 }
1266
1267 #[test]
1268 fn test_not_coercing_primitive_into_string_without_flag() {
1269 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
1270
1271 let buf = r#"{"a": 1}"#;
1272 let err = ReaderBuilder::new(schema.clone())
1273 .with_batch_size(1024)
1274 .build(Cursor::new(buf.as_bytes()))
1275 .unwrap()
1276 .read()
1277 .unwrap_err();
1278
1279 assert_eq!(
1280 err.to_string(),
1281 "Json error: whilst decoding field 'a': expected string got 1"
1282 );
1283
1284 let buf = r#"{"a": true}"#;
1285 let err = ReaderBuilder::new(schema)
1286 .with_batch_size(1024)
1287 .build(Cursor::new(buf.as_bytes()))
1288 .unwrap()
1289 .read()
1290 .unwrap_err();
1291
1292 assert_eq!(
1293 err.to_string(),
1294 "Json error: whilst decoding field 'a': expected string got true"
1295 );
1296 }
1297
1298 #[test]
1299 fn test_coercing_primitive_into_string() {
1300 let buf = r#"
1301 {"a": 1, "b": 2, "c": true}
1302 {"a": 2E0, "b": 4, "c": false}
1303
1304 {"b": 6, "a": 2.0}
1305 {"b": "5", "a": 2}
1306 {"b": 4e0}
1307 {"b": 7, "a": null}
1308 "#;
1309
1310 let schema = Arc::new(Schema::new(vec![
1311 Field::new("a", DataType::Utf8, true),
1312 Field::new("b", DataType::Utf8, true),
1313 Field::new("c", DataType::Utf8, true),
1314 ]));
1315
1316 let batches = do_read(buf, 1024, true, false, schema);
1317 assert_eq!(batches.len(), 1);
1318
1319 let col1 = batches[0].column(0).as_string::<i32>();
1320 assert_eq!(col1.null_count(), 2);
1321 assert_eq!(col1.value(0), "1");
1322 assert_eq!(col1.value(1), "2E0");
1323 assert_eq!(col1.value(2), "2.0");
1324 assert_eq!(col1.value(3), "2");
1325 assert!(col1.is_null(4));
1326 assert!(col1.is_null(5));
1327
1328 let col2 = batches[0].column(1).as_string::<i32>();
1329 assert_eq!(col2.null_count(), 0);
1330 assert_eq!(col2.value(0), "2");
1331 assert_eq!(col2.value(1), "4");
1332 assert_eq!(col2.value(2), "6");
1333 assert_eq!(col2.value(3), "5");
1334 assert_eq!(col2.value(4), "4e0");
1335 assert_eq!(col2.value(5), "7");
1336
1337 let col3 = batches[0].column(2).as_string::<i32>();
1338 assert_eq!(col3.null_count(), 4);
1339 assert_eq!(col3.value(0), "true");
1340 assert_eq!(col3.value(1), "false");
1341 assert!(col3.is_null(2));
1342 assert!(col3.is_null(3));
1343 assert!(col3.is_null(4));
1344 assert!(col3.is_null(5));
1345 }
1346
1347 fn test_decimal<T: DecimalType>(data_type: DataType) {
1348 let buf = r#"
1349 {"a": 1, "b": 2, "c": 38.30}
1350 {"a": 2, "b": 4, "c": 123.456}
1351
1352 {"b": 1337, "a": "2.0452"}
1353 {"b": "5", "a": "11034.2"}
1354 {"b": 40}
1355 {"b": 1234, "a": null}
1356 "#;
1357
1358 let schema = Arc::new(Schema::new(vec![
1359 Field::new("a", data_type.clone(), true),
1360 Field::new("b", data_type.clone(), true),
1361 Field::new("c", data_type, true),
1362 ]));
1363
1364 let batches = do_read(buf, 1024, true, false, schema);
1365 assert_eq!(batches.len(), 1);
1366
1367 let col1 = batches[0].column(0).as_primitive::<T>();
1368 assert_eq!(col1.null_count(), 2);
1369 assert!(col1.is_null(4));
1370 assert!(col1.is_null(5));
1371 assert_eq!(
1372 col1.values(),
1373 &[100, 200, 204, 1103420, 0, 0].map(T::Native::usize_as)
1374 );
1375
1376 let col2 = batches[0].column(1).as_primitive::<T>();
1377 assert_eq!(col2.null_count(), 0);
1378 assert_eq!(
1379 col2.values(),
1380 &[200, 400, 133700, 500, 4000, 123400].map(T::Native::usize_as)
1381 );
1382
1383 let col3 = batches[0].column(2).as_primitive::<T>();
1384 assert_eq!(col3.null_count(), 4);
1385 assert!(!col3.is_null(0));
1386 assert!(!col3.is_null(1));
1387 assert!(col3.is_null(2));
1388 assert!(col3.is_null(3));
1389 assert!(col3.is_null(4));
1390 assert!(col3.is_null(5));
1391 assert_eq!(
1392 col3.values(),
1393 &[3830, 12345, 0, 0, 0, 0].map(T::Native::usize_as)
1394 );
1395 }
1396
1397 #[test]
1398 fn test_decimals() {
1399 test_decimal::<Decimal32Type>(DataType::Decimal32(8, 2));
1400 test_decimal::<Decimal64Type>(DataType::Decimal64(10, 2));
1401 test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
1402 test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
1403 }
1404
1405 fn test_timestamp<T: ArrowTimestampType>() {
1406 let buf = r#"
1407 {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"}
1408 {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456}
1409
1410 {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"}
1411 {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"}
1412 {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"}
1413 {"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"}
1414 "#;
1415
1416 let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".into()));
1417 let schema = Arc::new(Schema::new(vec![
1418 Field::new("a", T::DATA_TYPE, true),
1419 Field::new("b", T::DATA_TYPE, true),
1420 Field::new("c", T::DATA_TYPE, true),
1421 Field::new("d", with_timezone, true),
1422 ]));
1423
1424 let batches = do_read(buf, 1024, true, false, schema);
1425 assert_eq!(batches.len(), 1);
1426
1427 let unit_in_nanos: i64 = match T::UNIT {
1428 TimeUnit::Second => 1_000_000_000,
1429 TimeUnit::Millisecond => 1_000_000,
1430 TimeUnit::Microsecond => 1_000,
1431 TimeUnit::Nanosecond => 1,
1432 };
1433
1434 let col1 = batches[0].column(0).as_primitive::<T>();
1435 assert_eq!(col1.null_count(), 4);
1436 assert!(col1.is_null(2));
1437 assert!(col1.is_null(3));
1438 assert!(col1.is_null(4));
1439 assert!(col1.is_null(5));
1440 assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1441
1442 let col2 = batches[0].column(1).as_primitive::<T>();
1443 assert_eq!(col2.null_count(), 1);
1444 assert!(col2.is_null(5));
1445 assert_eq!(
1446 col2.values(),
1447 &[
1448 1599572549190855000 / unit_in_nanos,
1449 1599572549190855000 / unit_in_nanos,
1450 1599572549000000000 / unit_in_nanos,
1451 40,
1452 1234,
1453 0
1454 ]
1455 );
1456
1457 let col3 = batches[0].column(2).as_primitive::<T>();
1458 assert_eq!(col3.null_count(), 0);
1459 assert_eq!(
1460 col3.values(),
1461 &[
1462 38,
1463 123,
1464 854702816123000000 / unit_in_nanos,
1465 1599572549190855000 / unit_in_nanos,
1466 854702816123000000 / unit_in_nanos,
1467 854738816123000000 / unit_in_nanos
1468 ]
1469 );
1470
1471 let col4 = batches[0].column(3).as_primitive::<T>();
1472
1473 assert_eq!(col4.null_count(), 0);
1474 assert_eq!(
1475 col4.values(),
1476 &[
1477 854674016123000000 / unit_in_nanos,
1478 123,
1479 854702816123000000 / unit_in_nanos,
1480 854720816123000000 / unit_in_nanos,
1481 854674016000000000 / unit_in_nanos,
1482 854640000000000000 / unit_in_nanos
1483 ]
1484 );
1485 }
1486
1487 #[test]
1488 fn test_timestamps() {
1489 test_timestamp::<TimestampSecondType>();
1490 test_timestamp::<TimestampMillisecondType>();
1491 test_timestamp::<TimestampMicrosecondType>();
1492 test_timestamp::<TimestampNanosecondType>();
1493 }
1494
1495 fn test_time<T: ArrowTemporalType>() {
1496 let buf = r#"
1497 {"a": 1, "b": "09:26:56.123 AM", "c": 38.30}
1498 {"a": 2, "b": "23:59:59", "c": 123.456}
1499
1500 {"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"}
1501 {"b": 40, "c": "13:42:29.190855"}
1502 {"b": 1234, "a": null, "c": "09:26:56.123"}
1503 {"c": "14:26:56.123"}
1504 "#;
1505
1506 let unit = match T::DATA_TYPE {
1507 DataType::Time32(unit) | DataType::Time64(unit) => unit,
1508 _ => unreachable!(),
1509 };
1510
1511 let unit_in_nanos = match unit {
1512 TimeUnit::Second => 1_000_000_000,
1513 TimeUnit::Millisecond => 1_000_000,
1514 TimeUnit::Microsecond => 1_000,
1515 TimeUnit::Nanosecond => 1,
1516 };
1517
1518 let schema = Arc::new(Schema::new(vec![
1519 Field::new("a", T::DATA_TYPE, true),
1520 Field::new("b", T::DATA_TYPE, true),
1521 Field::new("c", T::DATA_TYPE, true),
1522 ]));
1523
1524 let batches = do_read(buf, 1024, true, false, schema);
1525 assert_eq!(batches.len(), 1);
1526
1527 let col1 = batches[0].column(0).as_primitive::<T>();
1528 assert_eq!(col1.null_count(), 4);
1529 assert!(col1.is_null(2));
1530 assert!(col1.is_null(3));
1531 assert!(col1.is_null(4));
1532 assert!(col1.is_null(5));
1533 assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1534
1535 let col2 = batches[0].column(1).as_primitive::<T>();
1536 assert_eq!(col2.null_count(), 1);
1537 assert!(col2.is_null(5));
1538 assert_eq!(
1539 col2.values(),
1540 &[
1541 34016123000000 / unit_in_nanos,
1542 86399000000000 / unit_in_nanos,
1543 64800000000000 / unit_in_nanos,
1544 40,
1545 1234,
1546 0
1547 ]
1548 .map(T::Native::usize_as)
1549 );
1550
1551 let col3 = batches[0].column(2).as_primitive::<T>();
1552 assert_eq!(col3.null_count(), 0);
1553 assert_eq!(
1554 col3.values(),
1555 &[
1556 38,
1557 123,
1558 34016123000000 / unit_in_nanos,
1559 49349190855000 / unit_in_nanos,
1560 34016123000000 / unit_in_nanos,
1561 52016123000000 / unit_in_nanos
1562 ]
1563 .map(T::Native::usize_as)
1564 );
1565 }
1566
1567 #[test]
1568 fn test_times() {
1569 test_time::<Time32MillisecondType>();
1570 test_time::<Time32SecondType>();
1571 test_time::<Time64MicrosecondType>();
1572 test_time::<Time64NanosecondType>();
1573 }
1574
1575 fn test_duration<T: ArrowTemporalType>() {
1576 let buf = r#"
1577 {"a": 1, "b": "2"}
1578 {"a": 3, "b": null}
1579 "#;
1580
1581 let schema = Arc::new(Schema::new(vec![
1582 Field::new("a", T::DATA_TYPE, true),
1583 Field::new("b", T::DATA_TYPE, true),
1584 ]));
1585
1586 let batches = do_read(buf, 1024, true, false, schema);
1587 assert_eq!(batches.len(), 1);
1588
1589 let col_a = batches[0].column_by_name("a").unwrap().as_primitive::<T>();
1590 assert_eq!(col_a.null_count(), 0);
1591 assert_eq!(col_a.values(), &[1, 3].map(T::Native::usize_as));
1592
1593 let col2 = batches[0].column_by_name("b").unwrap().as_primitive::<T>();
1594 assert_eq!(col2.null_count(), 1);
1595 assert_eq!(col2.values(), &[2, 0].map(T::Native::usize_as));
1596 }
1597
1598 #[test]
1599 fn test_durations() {
1600 test_duration::<DurationNanosecondType>();
1601 test_duration::<DurationMicrosecondType>();
1602 test_duration::<DurationMillisecondType>();
1603 test_duration::<DurationSecondType>();
1604 }
1605
1606 #[test]
1607 fn test_delta_checkpoint() {
1608 let json = "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}";
1609 let schema = Arc::new(Schema::new(vec![
1610 Field::new_struct(
1611 "protocol",
1612 vec![
1613 Field::new("minReaderVersion", DataType::Int32, true),
1614 Field::new("minWriterVersion", DataType::Int32, true),
1615 ],
1616 true,
1617 ),
1618 Field::new_struct(
1619 "add",
1620 vec![Field::new_map(
1621 "partitionValues",
1622 "key_value",
1623 Field::new("key", DataType::Utf8, false),
1624 Field::new("value", DataType::Utf8, true),
1625 false,
1626 false,
1627 )],
1628 true,
1629 ),
1630 ]));
1631
1632 let batches = do_read(json, 1024, true, false, schema);
1633 assert_eq!(batches.len(), 1);
1634
1635 let s: StructArray = batches.into_iter().next().unwrap().into();
1636 let opts = FormatOptions::default().with_null("null");
1637 let formatter = ArrayFormatter::try_new(&s, &opts).unwrap();
1638 assert_eq!(
1639 formatter.value(0).to_string(),
1640 "{protocol: {minReaderVersion: 1, minWriterVersion: 2}, add: null}"
1641 );
1642 }
1643
1644 #[test]
1645 fn struct_nullability() {
1646 let do_test = |child: DataType| {
1647 let non_null = r#"{"foo": {}}"#;
1649 let schema = Arc::new(Schema::new(vec![Field::new_struct(
1650 "foo",
1651 vec![Field::new("bar", child, false)],
1652 true,
1653 )]));
1654 let mut reader = ReaderBuilder::new(schema.clone())
1655 .build(Cursor::new(non_null.as_bytes()))
1656 .unwrap();
1657 assert!(reader.next().unwrap().is_err()); let null = r#"{"foo": {bar: null}}"#;
1660 let mut reader = ReaderBuilder::new(schema.clone())
1661 .build(Cursor::new(null.as_bytes()))
1662 .unwrap();
1663 assert!(reader.next().unwrap().is_err()); let null = r#"{"foo": null}"#;
1667 let mut reader = ReaderBuilder::new(schema)
1668 .build(Cursor::new(null.as_bytes()))
1669 .unwrap();
1670 let batch = reader.next().unwrap().unwrap();
1671 assert_eq!(batch.num_columns(), 1);
1672 let foo = batch.column(0).as_struct();
1673 assert_eq!(foo.len(), 1);
1674 assert!(foo.is_null(0));
1675 assert_eq!(foo.num_columns(), 1);
1676
1677 let bar = foo.column(0);
1678 assert_eq!(bar.len(), 1);
1679 assert!(bar.is_null(0));
1681 };
1682
1683 do_test(DataType::Boolean);
1684 do_test(DataType::Int32);
1685 do_test(DataType::Utf8);
1686 do_test(DataType::Decimal128(2, 1));
1687 do_test(DataType::Timestamp(
1688 TimeUnit::Microsecond,
1689 Some("+00:00".into()),
1690 ));
1691 }
1692
1693 #[test]
1694 fn test_truncation() {
1695 let buf = r#"
1696 {"i64": 9223372036854775807, "u64": 18446744073709551615 }
1697 {"i64": "9223372036854775807", "u64": "18446744073709551615" }
1698 {"i64": -9223372036854775808, "u64": 0 }
1699 {"i64": "-9223372036854775808", "u64": 0 }
1700 "#;
1701
1702 let schema = Arc::new(Schema::new(vec![
1703 Field::new("i64", DataType::Int64, true),
1704 Field::new("u64", DataType::UInt64, true),
1705 ]));
1706
1707 let batches = do_read(buf, 1024, true, false, schema);
1708 assert_eq!(batches.len(), 1);
1709
1710 let i64 = batches[0].column(0).as_primitive::<Int64Type>();
1711 assert_eq!(i64.values(), &[i64::MAX, i64::MAX, i64::MIN, i64::MIN]);
1712
1713 let u64 = batches[0].column(1).as_primitive::<UInt64Type>();
1714 assert_eq!(u64.values(), &[u64::MAX, u64::MAX, u64::MIN, u64::MIN]);
1715 }
1716
1717 #[test]
1718 fn test_timestamp_truncation() {
1719 let buf = r#"
1720 {"time": 9223372036854775807 }
1721 {"time": -9223372036854775808 }
1722 {"time": 9e5 }
1723 "#;
1724
1725 let schema = Arc::new(Schema::new(vec![Field::new(
1726 "time",
1727 DataType::Timestamp(TimeUnit::Nanosecond, None),
1728 true,
1729 )]));
1730
1731 let batches = do_read(buf, 1024, true, false, schema);
1732 assert_eq!(batches.len(), 1);
1733
1734 let i64 = batches[0]
1735 .column(0)
1736 .as_primitive::<TimestampNanosecondType>();
1737 assert_eq!(i64.values(), &[i64::MAX, i64::MIN, 900000]);
1738 }
1739
1740 #[test]
1741 fn test_strict_mode_no_missing_columns_in_schema() {
1742 let buf = r#"
1743 {"a": 1, "b": "2", "c": true}
1744 {"a": 2E0, "b": "4", "c": false}
1745 "#;
1746
1747 let schema = Arc::new(Schema::new(vec![
1748 Field::new("a", DataType::Int16, false),
1749 Field::new("b", DataType::Utf8, false),
1750 Field::new("c", DataType::Boolean, false),
1751 ]));
1752
1753 let batches = do_read(buf, 1024, true, true, schema);
1754 assert_eq!(batches.len(), 1);
1755
1756 let buf = r#"
1757 {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1758 {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1759 "#;
1760
1761 let schema = Arc::new(Schema::new(vec![
1762 Field::new("a", DataType::Int16, false),
1763 Field::new("b", DataType::Utf8, false),
1764 Field::new_struct(
1765 "c",
1766 vec![
1767 Field::new("a", DataType::Boolean, false),
1768 Field::new("b", DataType::Int16, false),
1769 ],
1770 false,
1771 ),
1772 ]));
1773
1774 let batches = do_read(buf, 1024, true, true, schema);
1775 assert_eq!(batches.len(), 1);
1776 }
1777
1778 #[test]
1779 fn test_strict_mode_missing_columns_in_schema() {
1780 let buf = r#"
1781 {"a": 1, "b": "2", "c": true}
1782 {"a": 2E0, "b": "4", "c": false}
1783 "#;
1784
1785 let schema = Arc::new(Schema::new(vec![
1786 Field::new("a", DataType::Int16, true),
1787 Field::new("c", DataType::Boolean, true),
1788 ]));
1789
1790 let err = ReaderBuilder::new(schema)
1791 .with_batch_size(1024)
1792 .with_strict_mode(true)
1793 .build(Cursor::new(buf.as_bytes()))
1794 .unwrap()
1795 .read()
1796 .unwrap_err();
1797
1798 assert_eq!(
1799 err.to_string(),
1800 "Json error: column 'b' missing from schema"
1801 );
1802
1803 let buf = r#"
1804 {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1805 {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1806 "#;
1807
1808 let schema = Arc::new(Schema::new(vec![
1809 Field::new("a", DataType::Int16, false),
1810 Field::new("b", DataType::Utf8, false),
1811 Field::new_struct("c", vec![Field::new("a", DataType::Boolean, false)], false),
1812 ]));
1813
1814 let err = ReaderBuilder::new(schema)
1815 .with_batch_size(1024)
1816 .with_strict_mode(true)
1817 .build(Cursor::new(buf.as_bytes()))
1818 .unwrap()
1819 .read()
1820 .unwrap_err();
1821
1822 assert_eq!(
1823 err.to_string(),
1824 "Json error: whilst decoding field 'c': column 'b' missing from schema"
1825 );
1826 }
1827
1828 fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
1829 let file = File::open(path).unwrap();
1830 let mut reader = BufReader::new(file);
1831 let schema = schema.unwrap_or_else(|| {
1832 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1833 reader.rewind().unwrap();
1834 schema
1835 });
1836 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1837 builder.build(reader).unwrap()
1838 }
1839
1840 #[test]
1841 fn test_json_basic() {
1842 let mut reader = read_file("test/data/basic.json", None);
1843 let batch = reader.next().unwrap().unwrap();
1844
1845 assert_eq!(8, batch.num_columns());
1846 assert_eq!(12, batch.num_rows());
1847
1848 let schema = reader.schema();
1849 let batch_schema = batch.schema();
1850 assert_eq!(schema, batch_schema);
1851
1852 let a = schema.column_with_name("a").unwrap();
1853 assert_eq!(0, a.0);
1854 assert_eq!(&DataType::Int64, a.1.data_type());
1855 let b = schema.column_with_name("b").unwrap();
1856 assert_eq!(1, b.0);
1857 assert_eq!(&DataType::Float64, b.1.data_type());
1858 let c = schema.column_with_name("c").unwrap();
1859 assert_eq!(2, c.0);
1860 assert_eq!(&DataType::Boolean, c.1.data_type());
1861 let d = schema.column_with_name("d").unwrap();
1862 assert_eq!(3, d.0);
1863 assert_eq!(&DataType::Utf8, d.1.data_type());
1864
1865 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1866 assert_eq!(1, aa.value(0));
1867 assert_eq!(-10, aa.value(1));
1868 let bb = batch.column(b.0).as_primitive::<Float64Type>();
1869 assert_eq!(2.0, bb.value(0));
1870 assert_eq!(-3.5, bb.value(1));
1871 let cc = batch.column(c.0).as_boolean();
1872 assert!(!cc.value(0));
1873 assert!(cc.value(10));
1874 let dd = batch.column(d.0).as_string::<i32>();
1875 assert_eq!("4", dd.value(0));
1876 assert_eq!("text", dd.value(8));
1877 }
1878
1879 #[test]
1880 fn test_json_empty_projection() {
1881 let mut reader = read_file("test/data/basic.json", Some(Schema::empty()));
1882 let batch = reader.next().unwrap().unwrap();
1883
1884 assert_eq!(0, batch.num_columns());
1885 assert_eq!(12, batch.num_rows());
1886 }
1887
1888 #[test]
1889 fn test_json_basic_with_nulls() {
1890 let mut reader = read_file("test/data/basic_nulls.json", None);
1891 let batch = reader.next().unwrap().unwrap();
1892
1893 assert_eq!(4, batch.num_columns());
1894 assert_eq!(12, batch.num_rows());
1895
1896 let schema = reader.schema();
1897 let batch_schema = batch.schema();
1898 assert_eq!(schema, batch_schema);
1899
1900 let a = schema.column_with_name("a").unwrap();
1901 assert_eq!(&DataType::Int64, a.1.data_type());
1902 let b = schema.column_with_name("b").unwrap();
1903 assert_eq!(&DataType::Float64, b.1.data_type());
1904 let c = schema.column_with_name("c").unwrap();
1905 assert_eq!(&DataType::Boolean, c.1.data_type());
1906 let d = schema.column_with_name("d").unwrap();
1907 assert_eq!(&DataType::Utf8, d.1.data_type());
1908
1909 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1910 assert!(aa.is_valid(0));
1911 assert!(!aa.is_valid(1));
1912 assert!(!aa.is_valid(11));
1913 let bb = batch.column(b.0).as_primitive::<Float64Type>();
1914 assert!(bb.is_valid(0));
1915 assert!(!bb.is_valid(2));
1916 assert!(!bb.is_valid(11));
1917 let cc = batch.column(c.0).as_boolean();
1918 assert!(cc.is_valid(0));
1919 assert!(!cc.is_valid(4));
1920 assert!(!cc.is_valid(11));
1921 let dd = batch.column(d.0).as_string::<i32>();
1922 assert!(!dd.is_valid(0));
1923 assert!(dd.is_valid(1));
1924 assert!(!dd.is_valid(4));
1925 assert!(!dd.is_valid(11));
1926 }
1927
1928 #[test]
1929 fn test_json_basic_schema() {
1930 let schema = Schema::new(vec![
1931 Field::new("a", DataType::Int64, true),
1932 Field::new("b", DataType::Float32, false),
1933 Field::new("c", DataType::Boolean, false),
1934 Field::new("d", DataType::Utf8, false),
1935 ]);
1936
1937 let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1938 let reader_schema = reader.schema();
1939 assert_eq!(reader_schema.as_ref(), &schema);
1940 let batch = reader.next().unwrap().unwrap();
1941
1942 assert_eq!(4, batch.num_columns());
1943 assert_eq!(12, batch.num_rows());
1944
1945 let schema = batch.schema();
1946
1947 let a = schema.column_with_name("a").unwrap();
1948 assert_eq!(&DataType::Int64, a.1.data_type());
1949 let b = schema.column_with_name("b").unwrap();
1950 assert_eq!(&DataType::Float32, b.1.data_type());
1951 let c = schema.column_with_name("c").unwrap();
1952 assert_eq!(&DataType::Boolean, c.1.data_type());
1953 let d = schema.column_with_name("d").unwrap();
1954 assert_eq!(&DataType::Utf8, d.1.data_type());
1955
1956 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1957 assert_eq!(1, aa.value(0));
1958 assert_eq!(100000000000000, aa.value(11));
1959 let bb = batch.column(b.0).as_primitive::<Float32Type>();
1960 assert_eq!(2.0, bb.value(0));
1961 assert_eq!(-3.5, bb.value(1));
1962 }
1963
1964 #[test]
1965 fn test_json_basic_schema_projection() {
1966 let schema = Schema::new(vec![
1967 Field::new("a", DataType::Int64, true),
1968 Field::new("c", DataType::Boolean, false),
1969 ]);
1970
1971 let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1972 let batch = reader.next().unwrap().unwrap();
1973
1974 assert_eq!(2, batch.num_columns());
1975 assert_eq!(2, batch.schema().fields().len());
1976 assert_eq!(12, batch.num_rows());
1977
1978 assert_eq!(batch.schema().as_ref(), &schema);
1979
1980 let a = schema.column_with_name("a").unwrap();
1981 assert_eq!(0, a.0);
1982 assert_eq!(&DataType::Int64, a.1.data_type());
1983 let c = schema.column_with_name("c").unwrap();
1984 assert_eq!(1, c.0);
1985 assert_eq!(&DataType::Boolean, c.1.data_type());
1986 }
1987
1988 #[test]
1989 fn test_json_arrays() {
1990 let mut reader = read_file("test/data/arrays.json", None);
1991 let batch = reader.next().unwrap().unwrap();
1992
1993 assert_eq!(4, batch.num_columns());
1994 assert_eq!(3, batch.num_rows());
1995
1996 let schema = batch.schema();
1997
1998 let a = schema.column_with_name("a").unwrap();
1999 assert_eq!(&DataType::Int64, a.1.data_type());
2000 let b = schema.column_with_name("b").unwrap();
2001 assert_eq!(
2002 &DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))),
2003 b.1.data_type()
2004 );
2005 let c = schema.column_with_name("c").unwrap();
2006 assert_eq!(
2007 &DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
2008 c.1.data_type()
2009 );
2010 let d = schema.column_with_name("d").unwrap();
2011 assert_eq!(&DataType::Utf8, d.1.data_type());
2012
2013 let aa = batch.column(a.0).as_primitive::<Int64Type>();
2014 assert_eq!(1, aa.value(0));
2015 assert_eq!(-10, aa.value(1));
2016 assert_eq!(1627668684594000000, aa.value(2));
2017 let bb = batch.column(b.0).as_list::<i32>();
2018 let bb = bb.values().as_primitive::<Float64Type>();
2019 assert_eq!(9, bb.len());
2020 assert_eq!(2.0, bb.value(0));
2021 assert_eq!(-6.1, bb.value(5));
2022 assert!(!bb.is_valid(7));
2023
2024 let cc = batch
2025 .column(c.0)
2026 .as_any()
2027 .downcast_ref::<ListArray>()
2028 .unwrap();
2029 let cc = cc.values().as_boolean();
2030 assert_eq!(6, cc.len());
2031 assert!(!cc.value(0));
2032 assert!(!cc.value(4));
2033 assert!(!cc.is_valid(5));
2034 }
2035
2036 #[test]
2037 fn test_empty_json_arrays() {
2038 let json_content = r#"
2039 {"items": []}
2040 {"items": null}
2041 {}
2042 "#;
2043
2044 let schema = Arc::new(Schema::new(vec![Field::new(
2045 "items",
2046 DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2047 true,
2048 )]));
2049
2050 let batches = do_read(json_content, 1024, false, false, schema);
2051 assert_eq!(batches.len(), 1);
2052
2053 let col1 = batches[0].column(0).as_list::<i32>();
2054 assert_eq!(col1.null_count(), 2);
2055 assert!(col1.value(0).is_empty());
2056 assert_eq!(col1.value(0).data_type(), &DataType::Null);
2057 assert!(col1.is_null(1));
2058 assert!(col1.is_null(2));
2059 }
2060
2061 #[test]
2062 fn test_nested_empty_json_arrays() {
2063 let json_content = r#"
2064 {"items": [[],[]]}
2065 {"items": [[null, null],[null]]}
2066 "#;
2067
2068 let schema = Arc::new(Schema::new(vec![Field::new(
2069 "items",
2070 DataType::List(FieldRef::new(Field::new_list_field(
2071 DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2072 true,
2073 ))),
2074 true,
2075 )]));
2076
2077 let batches = do_read(json_content, 1024, false, false, schema);
2078 assert_eq!(batches.len(), 1);
2079
2080 let col1 = batches[0].column(0).as_list::<i32>();
2081 assert_eq!(col1.null_count(), 0);
2082 assert_eq!(col1.value(0).len(), 2);
2083 assert!(col1.value(0).as_list::<i32>().value(0).is_empty());
2084 assert!(col1.value(0).as_list::<i32>().value(1).is_empty());
2085
2086 assert_eq!(col1.value(1).len(), 2);
2087 assert_eq!(col1.value(1).as_list::<i32>().value(0).len(), 2);
2088 assert_eq!(col1.value(1).as_list::<i32>().value(1).len(), 1);
2089 }
2090
2091 #[test]
2092 fn test_nested_list_json_arrays() {
2093 let c_field = Field::new_struct("c", vec![Field::new("d", DataType::Utf8, true)], true);
2094 let a_struct_field = Field::new_struct(
2095 "a",
2096 vec![Field::new("b", DataType::Boolean, true), c_field.clone()],
2097 true,
2098 );
2099 let a_field = Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
2100 let schema = Arc::new(Schema::new(vec![a_field.clone()]));
2101 let builder = ReaderBuilder::new(schema).with_batch_size(64);
2102 let json_content = r#"
2103 {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
2104 {"a": [{"b": false, "c": null}]}
2105 {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
2106 {"a": null}
2107 {"a": []}
2108 {"a": [null]}
2109 "#;
2110 let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2111
2112 let d = StringArray::from(vec![
2114 Some("a_text"),
2115 Some("b_text"),
2116 None,
2117 Some("c_text"),
2118 Some("d_text"),
2119 None,
2120 None,
2121 ]);
2122 let c = ArrayDataBuilder::new(c_field.data_type().clone())
2123 .len(7)
2124 .add_child_data(d.to_data())
2125 .null_bit_buffer(Some(Buffer::from([0b00111011])))
2126 .build()
2127 .unwrap();
2128 let b = BooleanArray::from(vec![
2129 Some(true),
2130 Some(false),
2131 Some(false),
2132 Some(true),
2133 None,
2134 Some(true),
2135 None,
2136 ]);
2137 let a = ArrayDataBuilder::new(a_struct_field.data_type().clone())
2138 .len(7)
2139 .add_child_data(b.to_data())
2140 .add_child_data(c.clone())
2141 .null_bit_buffer(Some(Buffer::from([0b00111111])))
2142 .build()
2143 .unwrap();
2144 let a_list = ArrayDataBuilder::new(a_field.data_type().clone())
2145 .len(6)
2146 .add_buffer(Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7]))
2147 .add_child_data(a)
2148 .null_bit_buffer(Some(Buffer::from([0b00110111])))
2149 .build()
2150 .unwrap();
2151 let expected = make_array(a_list);
2152
2153 let batch = reader.next().unwrap().unwrap();
2155 let read = batch.column(0);
2156 assert_eq!(read.len(), 6);
2157 let read: &ListArray = read.as_list::<i32>();
2159 let expected = expected.as_list::<i32>();
2160 assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
2161 assert_eq!(read.nulls(), expected.nulls());
2163 let struct_array = read.values().as_struct();
2165 let expected_struct_array = expected.values().as_struct();
2166
2167 assert_eq!(7, struct_array.len());
2168 assert_eq!(1, struct_array.null_count());
2169 assert_eq!(7, expected_struct_array.len());
2170 assert_eq!(1, expected_struct_array.null_count());
2171 assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
2173 let read_b = struct_array.column(0);
2175 assert_eq!(read_b.as_ref(), &b);
2176 let read_c = struct_array.column(1);
2177 assert_eq!(read_c.to_data(), c);
2178 let read_c = read_c.as_struct();
2179 let read_d = read_c.column(0);
2180 assert_eq!(read_d.as_ref(), &d);
2181
2182 assert_eq!(read, expected);
2183 }
2184
2185 #[test]
2186 fn test_skip_empty_lines() {
2187 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2188 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
2189 let json_content = "
2190 {\"a\": 1}
2191 {\"a\": 2}
2192 {\"a\": 3}";
2193 let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2194 let batch = reader.next().unwrap().unwrap();
2195
2196 assert_eq!(1, batch.num_columns());
2197 assert_eq!(3, batch.num_rows());
2198
2199 let schema = reader.schema();
2200 let c = schema.column_with_name("a").unwrap();
2201 assert_eq!(&DataType::Int64, c.1.data_type());
2202 }
2203
2204 #[test]
2205 fn test_with_multiple_batches() {
2206 let file = File::open("test/data/basic_nulls.json").unwrap();
2207 let mut reader = BufReader::new(file);
2208 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2209 reader.rewind().unwrap();
2210
2211 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2212 let mut reader = builder.build(reader).unwrap();
2213
2214 let mut num_records = Vec::new();
2215 while let Some(rb) = reader.next().transpose().unwrap() {
2216 num_records.push(rb.num_rows());
2217 }
2218
2219 assert_eq!(vec![5, 5, 2], num_records);
2220 }
2221
2222 #[test]
2223 fn test_timestamp_from_json_seconds() {
2224 let schema = Schema::new(vec![Field::new(
2225 "a",
2226 DataType::Timestamp(TimeUnit::Second, None),
2227 true,
2228 )]);
2229
2230 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2231 let batch = reader.next().unwrap().unwrap();
2232
2233 assert_eq!(1, batch.num_columns());
2234 assert_eq!(12, batch.num_rows());
2235
2236 let schema = reader.schema();
2237 let batch_schema = batch.schema();
2238 assert_eq!(schema, batch_schema);
2239
2240 let a = schema.column_with_name("a").unwrap();
2241 assert_eq!(
2242 &DataType::Timestamp(TimeUnit::Second, None),
2243 a.1.data_type()
2244 );
2245
2246 let aa = batch.column(a.0).as_primitive::<TimestampSecondType>();
2247 assert!(aa.is_valid(0));
2248 assert!(!aa.is_valid(1));
2249 assert!(!aa.is_valid(2));
2250 assert_eq!(1, aa.value(0));
2251 assert_eq!(1, aa.value(3));
2252 assert_eq!(5, aa.value(7));
2253 }
2254
2255 #[test]
2256 fn test_timestamp_from_json_milliseconds() {
2257 let schema = Schema::new(vec![Field::new(
2258 "a",
2259 DataType::Timestamp(TimeUnit::Millisecond, None),
2260 true,
2261 )]);
2262
2263 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2264 let batch = reader.next().unwrap().unwrap();
2265
2266 assert_eq!(1, batch.num_columns());
2267 assert_eq!(12, batch.num_rows());
2268
2269 let schema = reader.schema();
2270 let batch_schema = batch.schema();
2271 assert_eq!(schema, batch_schema);
2272
2273 let a = schema.column_with_name("a").unwrap();
2274 assert_eq!(
2275 &DataType::Timestamp(TimeUnit::Millisecond, None),
2276 a.1.data_type()
2277 );
2278
2279 let aa = batch.column(a.0).as_primitive::<TimestampMillisecondType>();
2280 assert!(aa.is_valid(0));
2281 assert!(!aa.is_valid(1));
2282 assert!(!aa.is_valid(2));
2283 assert_eq!(1, aa.value(0));
2284 assert_eq!(1, aa.value(3));
2285 assert_eq!(5, aa.value(7));
2286 }
2287
2288 #[test]
2289 fn test_date_from_json_milliseconds() {
2290 let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
2291
2292 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2293 let batch = reader.next().unwrap().unwrap();
2294
2295 assert_eq!(1, batch.num_columns());
2296 assert_eq!(12, batch.num_rows());
2297
2298 let schema = reader.schema();
2299 let batch_schema = batch.schema();
2300 assert_eq!(schema, batch_schema);
2301
2302 let a = schema.column_with_name("a").unwrap();
2303 assert_eq!(&DataType::Date64, a.1.data_type());
2304
2305 let aa = batch.column(a.0).as_primitive::<Date64Type>();
2306 assert!(aa.is_valid(0));
2307 assert!(!aa.is_valid(1));
2308 assert!(!aa.is_valid(2));
2309 assert_eq!(1, aa.value(0));
2310 assert_eq!(1, aa.value(3));
2311 assert_eq!(5, aa.value(7));
2312 }
2313
2314 #[test]
2315 fn test_time_from_json_nanoseconds() {
2316 let schema = Schema::new(vec![Field::new(
2317 "a",
2318 DataType::Time64(TimeUnit::Nanosecond),
2319 true,
2320 )]);
2321
2322 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2323 let batch = reader.next().unwrap().unwrap();
2324
2325 assert_eq!(1, batch.num_columns());
2326 assert_eq!(12, batch.num_rows());
2327
2328 let schema = reader.schema();
2329 let batch_schema = batch.schema();
2330 assert_eq!(schema, batch_schema);
2331
2332 let a = schema.column_with_name("a").unwrap();
2333 assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
2334
2335 let aa = batch.column(a.0).as_primitive::<Time64NanosecondType>();
2336 assert!(aa.is_valid(0));
2337 assert!(!aa.is_valid(1));
2338 assert!(!aa.is_valid(2));
2339 assert_eq!(1, aa.value(0));
2340 assert_eq!(1, aa.value(3));
2341 assert_eq!(5, aa.value(7));
2342 }
2343
2344 #[test]
2345 fn test_json_iterator() {
2346 let file = File::open("test/data/basic.json").unwrap();
2347 let mut reader = BufReader::new(file);
2348 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2349 reader.rewind().unwrap();
2350
2351 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2352 let reader = builder.build(reader).unwrap();
2353 let schema = reader.schema();
2354 let (col_a_index, _) = schema.column_with_name("a").unwrap();
2355
2356 let mut sum_num_rows = 0;
2357 let mut num_batches = 0;
2358 let mut sum_a = 0;
2359 for batch in reader {
2360 let batch = batch.unwrap();
2361 assert_eq!(8, batch.num_columns());
2362 sum_num_rows += batch.num_rows();
2363 num_batches += 1;
2364 let batch_schema = batch.schema();
2365 assert_eq!(schema, batch_schema);
2366 let a_array = batch.column(col_a_index).as_primitive::<Int64Type>();
2367 sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
2368 }
2369 assert_eq!(12, sum_num_rows);
2370 assert_eq!(3, num_batches);
2371 assert_eq!(100000000000011, sum_a);
2372 }
2373
2374 #[test]
2375 fn test_decoder_error() {
2376 let schema = Arc::new(Schema::new(vec![Field::new_struct(
2377 "a",
2378 vec![Field::new("child", DataType::Int32, false)],
2379 true,
2380 )]));
2381
2382 let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
2383 let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
2384 assert!(decoder.tape_decoder.has_partial_row());
2385 assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2386 let _ = decoder.flush().unwrap_err();
2387 assert!(decoder.tape_decoder.has_partial_row());
2388 assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2389
2390 let parse_err = |s: &str| {
2391 ReaderBuilder::new(schema.clone())
2392 .build(Cursor::new(s.as_bytes()))
2393 .unwrap()
2394 .next()
2395 .unwrap()
2396 .unwrap_err()
2397 .to_string()
2398 };
2399
2400 let err = parse_err(r#"{"a": 123}"#);
2401 assert_eq!(
2402 err,
2403 "Json error: whilst decoding field 'a': expected { got 123"
2404 );
2405
2406 let err = parse_err(r#"{"a": ["bar"]}"#);
2407 assert_eq!(
2408 err,
2409 r#"Json error: whilst decoding field 'a': expected { got ["bar"]"#
2410 );
2411
2412 let err = parse_err(r#"{"a": []}"#);
2413 assert_eq!(
2414 err,
2415 "Json error: whilst decoding field 'a': expected { got []"
2416 );
2417
2418 let err = parse_err(r#"{"a": [{"child": 234}]}"#);
2419 assert_eq!(
2420 err,
2421 r#"Json error: whilst decoding field 'a': expected { got [{"child": 234}]"#
2422 );
2423
2424 let err = parse_err(r#"{"a": [{"child": {"foo": [{"foo": ["bar"]}]}}]}"#);
2425 assert_eq!(
2426 err,
2427 r#"Json error: whilst decoding field 'a': expected { got [{"child": {"foo": [{"foo": ["bar"]}]}}]"#
2428 );
2429
2430 let err = parse_err(r#"{"a": true}"#);
2431 assert_eq!(
2432 err,
2433 "Json error: whilst decoding field 'a': expected { got true"
2434 );
2435
2436 let err = parse_err(r#"{"a": false}"#);
2437 assert_eq!(
2438 err,
2439 "Json error: whilst decoding field 'a': expected { got false"
2440 );
2441
2442 let err = parse_err(r#"{"a": "foo"}"#);
2443 assert_eq!(
2444 err,
2445 "Json error: whilst decoding field 'a': expected { got \"foo\""
2446 );
2447
2448 let err = parse_err(r#"{"a": {"child": false}}"#);
2449 assert_eq!(
2450 err,
2451 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got false"
2452 );
2453
2454 let err = parse_err(r#"{"a": {"child": []}}"#);
2455 assert_eq!(
2456 err,
2457 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got []"
2458 );
2459
2460 let err = parse_err(r#"{"a": {"child": [123]}}"#);
2461 assert_eq!(
2462 err,
2463 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123]"
2464 );
2465
2466 let err = parse_err(r#"{"a": {"child": [123, 3465346]}}"#);
2467 assert_eq!(
2468 err,
2469 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123, 3465346]"
2470 );
2471 }
2472
2473 #[test]
2474 fn test_serialize_timestamp() {
2475 let json = vec![
2476 json!({"timestamp": 1681319393}),
2477 json!({"timestamp": "1970-01-01T00:00:00+02:00"}),
2478 ];
2479 let schema = Schema::new(vec![Field::new(
2480 "timestamp",
2481 DataType::Timestamp(TimeUnit::Second, None),
2482 true,
2483 )]);
2484 let mut decoder = ReaderBuilder::new(Arc::new(schema))
2485 .build_decoder()
2486 .unwrap();
2487 decoder.serialize(&json).unwrap();
2488 let batch = decoder.flush().unwrap().unwrap();
2489 assert_eq!(batch.num_rows(), 2);
2490 assert_eq!(batch.num_columns(), 1);
2491 let values = batch.column(0).as_primitive::<TimestampSecondType>();
2492 assert_eq!(values.values(), &[1681319393, -7200]);
2493 }
2494
2495 #[test]
2496 fn test_serialize_decimal() {
2497 let json = vec![
2498 json!({"decimal": 1.234}),
2499 json!({"decimal": "1.234"}),
2500 json!({"decimal": 1234}),
2501 json!({"decimal": "1234"}),
2502 ];
2503 let schema = Schema::new(vec![Field::new(
2504 "decimal",
2505 DataType::Decimal128(10, 3),
2506 true,
2507 )]);
2508 let mut decoder = ReaderBuilder::new(Arc::new(schema))
2509 .build_decoder()
2510 .unwrap();
2511 decoder.serialize(&json).unwrap();
2512 let batch = decoder.flush().unwrap().unwrap();
2513 assert_eq!(batch.num_rows(), 4);
2514 assert_eq!(batch.num_columns(), 1);
2515 let values = batch.column(0).as_primitive::<Decimal128Type>();
2516 assert_eq!(values.values(), &[1234, 1234, 1234000, 1234000]);
2517 }
2518
2519 #[test]
2520 fn test_serde_field() {
2521 let field = Field::new("int", DataType::Int32, true);
2522 let mut decoder = ReaderBuilder::new_with_field(field)
2523 .build_decoder()
2524 .unwrap();
2525 decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
2526 let b = decoder.flush().unwrap().unwrap();
2527 let values = b.column(0).as_primitive::<Int32Type>().values();
2528 assert_eq!(values, &[1, 2, 3, 4]);
2529 }
2530
2531 #[test]
2532 fn test_serde_large_numbers() {
2533 let field = Field::new("int", DataType::Int64, true);
2534 let mut decoder = ReaderBuilder::new_with_field(field)
2535 .build_decoder()
2536 .unwrap();
2537
2538 decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2539 let b = decoder.flush().unwrap().unwrap();
2540 let values = b.column(0).as_primitive::<Int64Type>().values();
2541 assert_eq!(values, &[1699148028689, 2, 3, 4]);
2542
2543 let field = Field::new(
2544 "int",
2545 DataType::Timestamp(TimeUnit::Microsecond, None),
2546 true,
2547 );
2548 let mut decoder = ReaderBuilder::new_with_field(field)
2549 .build_decoder()
2550 .unwrap();
2551
2552 decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2553 let b = decoder.flush().unwrap().unwrap();
2554 let values = b
2555 .column(0)
2556 .as_primitive::<TimestampMicrosecondType>()
2557 .values();
2558 assert_eq!(values, &[1699148028689, 2, 3, 4]);
2559 }
2560
2561 #[test]
2562 fn test_coercing_primitive_into_string_decoder() {
2563 let buf = &format!(
2564 r#"[{{"a": 1, "b": "A", "c": "T"}}, {{"a": 2, "b": "BB", "c": "F"}}, {{"a": {}, "b": 123, "c": false}}, {{"a": {}, "b": 789, "c": true}}]"#,
2565 (i32::MAX as i64 + 10),
2566 i64::MAX - 10
2567 );
2568 let schema = Schema::new(vec![
2569 Field::new("a", DataType::Float64, true),
2570 Field::new("b", DataType::Utf8, true),
2571 Field::new("c", DataType::Utf8, true),
2572 ]);
2573 let json_array: Vec<serde_json::Value> = serde_json::from_str(buf).unwrap();
2574 let schema_ref = Arc::new(schema);
2575
2576 let reader = ReaderBuilder::new(schema_ref.clone()).with_coerce_primitive(true);
2578 let mut decoder = reader.build_decoder().unwrap();
2579 decoder.serialize(json_array.as_slice()).unwrap();
2580 let batch = decoder.flush().unwrap().unwrap();
2581 assert_eq!(
2582 batch,
2583 RecordBatch::try_new(
2584 schema_ref,
2585 vec![
2586 Arc::new(Float64Array::from(vec![
2587 1.0,
2588 2.0,
2589 (i32::MAX as i64 + 10) as f64,
2590 (i64::MAX - 10) as f64
2591 ])),
2592 Arc::new(StringArray::from(vec!["A", "BB", "123", "789"])),
2593 Arc::new(StringArray::from(vec!["T", "F", "false", "true"])),
2594 ]
2595 )
2596 .unwrap()
2597 );
2598 }
2599
2600 fn _parse_structs(
2605 row: &str,
2606 struct_mode: StructMode,
2607 fields: Fields,
2608 as_struct: bool,
2609 ) -> Result<RecordBatch, ArrowError> {
2610 let builder = if as_struct {
2611 ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
2612 } else {
2613 ReaderBuilder::new(Arc::new(Schema::new(fields)))
2614 };
2615 builder
2616 .with_struct_mode(struct_mode)
2617 .build(Cursor::new(row.as_bytes()))
2618 .unwrap()
2619 .next()
2620 .unwrap()
2621 }
2622
2623 #[test]
2624 fn test_struct_decoding_list_length() {
2625 use arrow_array::array;
2626
2627 let row = "[1, 2]";
2628
2629 let mut fields = vec![Field::new("a", DataType::Int32, true)];
2630 let too_few_fields = Fields::from(fields.clone());
2631 fields.push(Field::new("b", DataType::Int32, true));
2632 let correct_fields = Fields::from(fields.clone());
2633 fields.push(Field::new("c", DataType::Int32, true));
2634 let too_many_fields = Fields::from(fields.clone());
2635
2636 let parse = |fields: Fields, as_struct: bool| {
2637 _parse_structs(row, StructMode::ListOnly, fields, as_struct)
2638 };
2639
2640 let expected_row = StructArray::new(
2641 correct_fields.clone(),
2642 vec![
2643 Arc::new(array::Int32Array::from(vec![1])),
2644 Arc::new(array::Int32Array::from(vec![2])),
2645 ],
2646 None,
2647 );
2648 let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);
2649
2650 assert_eq!(
2651 parse(too_few_fields.clone(), true).unwrap_err().to_string(),
2652 "Json error: found extra columns for 1 fields".to_string()
2653 );
2654 assert_eq!(
2655 parse(too_few_fields, false).unwrap_err().to_string(),
2656 "Json error: found extra columns for 1 fields".to_string()
2657 );
2658 assert_eq!(
2659 parse(correct_fields.clone(), true).unwrap(),
2660 RecordBatch::try_new(
2661 Arc::new(Schema::new(vec![row_field])),
2662 vec![Arc::new(expected_row.clone())]
2663 )
2664 .unwrap()
2665 );
2666 assert_eq!(
2667 parse(correct_fields, false).unwrap(),
2668 RecordBatch::from(expected_row)
2669 );
2670 assert_eq!(
2671 parse(too_many_fields.clone(), true)
2672 .unwrap_err()
2673 .to_string(),
2674 "Json error: found 2 columns for 3 fields".to_string()
2675 );
2676 assert_eq!(
2677 parse(too_many_fields, false).unwrap_err().to_string(),
2678 "Json error: found 2 columns for 3 fields".to_string()
2679 );
2680 }
2681
2682 #[test]
2683 fn test_struct_decoding() {
2684 use arrow_array::builder;
2685
2686 let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
2687 let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
2688 let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;
2689
2690 let struct_fields = Fields::from(vec![
2691 Field::new("b", DataType::new_list(DataType::Int32, true), true),
2692 Field::new_map(
2693 "c",
2694 "entries",
2695 Field::new("keys", DataType::Utf8, false),
2696 Field::new("values", DataType::Int32, true),
2697 false,
2698 false,
2699 ),
2700 ]);
2701
2702 let list_array =
2703 ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);
2704
2705 let map_array = {
2706 let mut map_builder = builder::MapBuilder::new(
2707 None,
2708 builder::StringBuilder::new(),
2709 builder::Int32Builder::new(),
2710 );
2711 map_builder.keys().append_value("d");
2712 map_builder.values().append_value(3);
2713 map_builder.append(true).unwrap();
2714 map_builder.finish()
2715 };
2716
2717 let struct_array = StructArray::new(
2718 struct_fields.clone(),
2719 vec![Arc::new(list_array), Arc::new(map_array)],
2720 None,
2721 );
2722
2723 let fields = Fields::from(vec![Field::new("a", DataType::Struct(struct_fields), true)]);
2724 let schema = Arc::new(Schema::new(fields.clone()));
2725 let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();
2726
2727 let parse = |row: &str, struct_mode: StructMode| {
2728 _parse_structs(row, struct_mode, fields.clone(), false)
2729 };
2730
2731 assert_eq!(
2732 parse(nested_object_json, StructMode::ObjectOnly).unwrap(),
2733 expected
2734 );
2735 assert_eq!(
2736 parse(nested_list_json, StructMode::ObjectOnly)
2737 .unwrap_err()
2738 .to_string(),
2739 "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
2740 );
2741 assert_eq!(
2742 parse(nested_mixed_json, StructMode::ObjectOnly)
2743 .unwrap_err()
2744 .to_string(),
2745 "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
2746 );
2747
2748 assert_eq!(
2749 parse(nested_list_json, StructMode::ListOnly).unwrap(),
2750 expected
2751 );
2752 assert_eq!(
2753 parse(nested_object_json, StructMode::ListOnly)
2754 .unwrap_err()
2755 .to_string(),
2756 "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
2757 );
2758 assert_eq!(
2759 parse(nested_mixed_json, StructMode::ListOnly)
2760 .unwrap_err()
2761 .to_string(),
2762 "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
2763 );
2764 }
2765
2766 #[test]
2772 fn test_struct_decoding_empty_list() {
2773 let int_field = Field::new("a", DataType::Int32, true);
2774 let struct_field = Field::new(
2775 "r",
2776 DataType::Struct(Fields::from(vec![int_field.clone()])),
2777 true,
2778 );
2779
2780 let parse = |row: &str, as_struct: bool, field: Field| {
2781 _parse_structs(
2782 row,
2783 StructMode::ListOnly,
2784 Fields::from(vec![field]),
2785 as_struct,
2786 )
2787 };
2788
2789 assert_eq!(
2791 parse("[]", true, struct_field.clone())
2792 .unwrap_err()
2793 .to_string(),
2794 "Json error: found 0 columns for 1 fields".to_owned()
2795 );
2796 assert_eq!(
2797 parse("[]", false, int_field.clone())
2798 .unwrap_err()
2799 .to_string(),
2800 "Json error: found 0 columns for 1 fields".to_owned()
2801 );
2802 assert_eq!(
2803 parse("[]", false, struct_field.clone())
2804 .unwrap_err()
2805 .to_string(),
2806 "Json error: found 0 columns for 1 fields".to_owned()
2807 );
2808 assert_eq!(
2809 parse("[[]]", false, struct_field.clone())
2810 .unwrap_err()
2811 .to_string(),
2812 "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
2813 );
2814 }
2815
2816 #[test]
2817 fn test_decode_list_struct_with_wrong_types() {
2818 let int_field = Field::new("a", DataType::Int32, true);
2819 let struct_field = Field::new(
2820 "r",
2821 DataType::Struct(Fields::from(vec![int_field.clone()])),
2822 true,
2823 );
2824
2825 let parse = |row: &str, as_struct: bool, field: Field| {
2826 _parse_structs(
2827 row,
2828 StructMode::ListOnly,
2829 Fields::from(vec![field]),
2830 as_struct,
2831 )
2832 };
2833
2834 assert_eq!(
2836 parse(r#"[["a"]]"#, false, struct_field.clone())
2837 .unwrap_err()
2838 .to_string(),
2839 "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2840 );
2841 assert_eq!(
2842 parse(r#"[["a"]]"#, true, struct_field.clone())
2843 .unwrap_err()
2844 .to_string(),
2845 "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2846 );
2847 assert_eq!(
2848 parse(r#"["a"]"#, true, int_field.clone())
2849 .unwrap_err()
2850 .to_string(),
2851 "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2852 );
2853 assert_eq!(
2854 parse(r#"["a"]"#, false, int_field.clone())
2855 .unwrap_err()
2856 .to_string(),
2857 "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2858 );
2859 }
2860}