1use crate::StructMode;
137use crate::reader::binary_array::{
138 BinaryArrayDecoder, BinaryViewDecoder, FixedSizeBinaryArrayDecoder,
139};
140use std::borrow::Cow;
141use std::io::BufRead;
142use std::sync::Arc;
143
144use chrono::Utc;
145use serde_core::Serialize;
146
147use arrow_array::timezone::Tz;
148use arrow_array::types::*;
149use arrow_array::{RecordBatch, RecordBatchReader, StructArray, downcast_integer, make_array};
150use arrow_data::ArrayData;
151use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
152pub use schema::*;
153pub use value_iter::ValueIter;
154
155use crate::reader::boolean_array::BooleanArrayDecoder;
156use crate::reader::decimal_array::DecimalArrayDecoder;
157use crate::reader::list_array::ListArrayDecoder;
158use crate::reader::map_array::MapArrayDecoder;
159use crate::reader::null_array::NullArrayDecoder;
160use crate::reader::primitive_array::PrimitiveArrayDecoder;
161use crate::reader::run_end_array::RunEndEncodedArrayDecoder;
162use crate::reader::string_array::StringArrayDecoder;
163use crate::reader::string_view_array::StringViewArrayDecoder;
164use crate::reader::struct_array::StructArrayDecoder;
165use crate::reader::tape::{Tape, TapeDecoder};
166use crate::reader::timestamp_array::TimestampArrayDecoder;
167
168mod binary_array;
169mod boolean_array;
170mod decimal_array;
171mod list_array;
172mod map_array;
173mod null_array;
174mod primitive_array;
175mod run_end_array;
176mod schema;
177mod serializer;
178mod string_array;
179mod string_view_array;
180mod struct_array;
181mod tape;
182mod timestamp_array;
183mod value_iter;
184
185pub struct ReaderBuilder {
187 batch_size: usize,
188 coerce_primitive: bool,
189 strict_mode: bool,
190 is_field: bool,
191 struct_mode: StructMode,
192
193 schema: SchemaRef,
194}
195
196impl ReaderBuilder {
197 pub fn new(schema: SchemaRef) -> Self {
206 Self {
207 batch_size: 1024,
208 coerce_primitive: false,
209 strict_mode: false,
210 is_field: false,
211 struct_mode: Default::default(),
212 schema,
213 }
214 }
215
216 pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
247 Self {
248 batch_size: 1024,
249 coerce_primitive: false,
250 strict_mode: false,
251 is_field: true,
252 struct_mode: Default::default(),
253 schema: Arc::new(Schema::new([field.into()])),
254 }
255 }
256
257 pub fn with_batch_size(self, batch_size: usize) -> Self {
259 Self { batch_size, ..self }
260 }
261
262 pub fn with_coerce_primitive(self, coerce_primitive: bool) -> Self {
265 Self {
266 coerce_primitive,
267 ..self
268 }
269 }
270
271 pub fn with_strict_mode(self, strict_mode: bool) -> Self {
277 Self {
278 strict_mode,
279 ..self
280 }
281 }
282
283 pub fn with_struct_mode(self, struct_mode: StructMode) -> Self {
287 Self {
288 struct_mode,
289 ..self
290 }
291 }
292
293 pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
295 Ok(Reader {
296 reader,
297 decoder: self.build_decoder()?,
298 })
299 }
300
301 pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
303 let (data_type, nullable) = if self.is_field {
304 let field = &self.schema.fields[0];
305 let data_type = Cow::Borrowed(field.data_type());
306 (data_type, field.is_nullable())
307 } else {
308 let data_type = Cow::Owned(DataType::Struct(self.schema.fields.clone()));
309 (data_type, false)
310 };
311
312 let ctx = DecoderContext {
313 coerce_primitive: self.coerce_primitive,
314 strict_mode: self.strict_mode,
315 struct_mode: self.struct_mode,
316 };
317 let decoder = ctx.make_decoder(data_type.as_ref(), nullable)?;
318
319 let num_fields = self.schema.flattened_fields().len();
320
321 Ok(Decoder {
322 decoder,
323 is_field: self.is_field,
324 tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
325 batch_size: self.batch_size,
326 schema: self.schema,
327 })
328 }
329}
330
331pub struct Reader<R> {
335 reader: R,
336 decoder: Decoder,
337}
338
339impl<R> std::fmt::Debug for Reader<R> {
340 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341 f.debug_struct("Reader")
342 .field("decoder", &self.decoder)
343 .finish()
344 }
345}
346
347impl<R: BufRead> Reader<R> {
348 fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
350 loop {
351 let buf = self.reader.fill_buf()?;
352 if buf.is_empty() {
353 break;
354 }
355 let read = buf.len();
356
357 let decoded = self.decoder.decode(buf)?;
358 self.reader.consume(decoded);
359 if decoded != read {
360 break;
361 }
362 }
363 self.decoder.flush()
364 }
365}
366
367impl<R: BufRead> Iterator for Reader<R> {
368 type Item = Result<RecordBatch, ArrowError>;
369
370 fn next(&mut self) -> Option<Self::Item> {
371 self.read().transpose()
372 }
373}
374
375impl<R: BufRead> RecordBatchReader for Reader<R> {
376 fn schema(&self) -> SchemaRef {
377 self.decoder.schema.clone()
378 }
379}
380
381pub struct Decoder {
422 tape_decoder: TapeDecoder,
423 decoder: Box<dyn ArrayDecoder>,
424 batch_size: usize,
425 is_field: bool,
426 schema: SchemaRef,
427}
428
429impl std::fmt::Debug for Decoder {
430 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
431 f.debug_struct("Decoder")
432 .field("schema", &self.schema)
433 .field("batch_size", &self.batch_size)
434 .finish()
435 }
436}
437
438impl Decoder {
439 pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
448 self.tape_decoder.decode(buf)
449 }
450
451 pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
628 self.tape_decoder.serialize(rows)
629 }
630
631 pub fn has_partial_record(&self) -> bool {
633 self.tape_decoder.has_partial_row()
634 }
635
636 pub fn len(&self) -> usize {
638 self.tape_decoder.num_buffered_rows()
639 }
640
641 pub fn is_empty(&self) -> bool {
643 self.len() == 0
644 }
645
646 pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
653 let tape = self.tape_decoder.finish()?;
654
655 if tape.num_rows() == 0 {
656 return Ok(None);
657 }
658
659 let mut next_object = 1;
661 let pos: Vec<_> = (0..tape.num_rows())
662 .map(|_| {
663 let next = tape.next(next_object, "row").unwrap();
664 std::mem::replace(&mut next_object, next)
665 })
666 .collect();
667
668 let decoded = self.decoder.decode(&tape, &pos)?;
669 self.tape_decoder.clear();
670
671 let batch = match self.is_field {
672 true => RecordBatch::try_new(self.schema.clone(), vec![make_array(decoded)])?,
673 false => {
674 RecordBatch::from(StructArray::from(decoded)).with_schema(self.schema.clone())?
675 }
676 };
677
678 Ok(Some(batch))
679 }
680}
681
682trait ArrayDecoder: Send {
683 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError>;
685}
686
687pub struct DecoderContext {
692 coerce_primitive: bool,
694 strict_mode: bool,
696 struct_mode: StructMode,
698}
699
700impl DecoderContext {
701 pub fn coerce_primitive(&self) -> bool {
703 self.coerce_primitive
704 }
705
706 pub fn strict_mode(&self) -> bool {
708 self.strict_mode
709 }
710
711 pub fn struct_mode(&self) -> StructMode {
713 self.struct_mode
714 }
715
716 fn make_decoder(
721 &self,
722 data_type: &DataType,
723 is_nullable: bool,
724 ) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
725 make_decoder(self, data_type, is_nullable)
726 }
727}
728
729macro_rules! primitive_decoder {
730 ($t:ty, $data_type:expr) => {
731 Ok(Box::new(PrimitiveArrayDecoder::<$t>::new($data_type)))
732 };
733}
734
735fn make_decoder(
736 ctx: &DecoderContext,
737 data_type: &DataType,
738 is_nullable: bool,
739) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
740 let coerce_primitive = ctx.coerce_primitive();
741 downcast_integer! {
742 *data_type => (primitive_decoder, data_type),
743 DataType::Null => Ok(Box::<NullArrayDecoder>::default()),
744 DataType::Float16 => primitive_decoder!(Float16Type, data_type),
745 DataType::Float32 => primitive_decoder!(Float32Type, data_type),
746 DataType::Float64 => primitive_decoder!(Float64Type, data_type),
747 DataType::Timestamp(TimeUnit::Second, None) => {
748 Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, Utc)))
749 },
750 DataType::Timestamp(TimeUnit::Millisecond, None) => {
751 Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, Utc)))
752 },
753 DataType::Timestamp(TimeUnit::Microsecond, None) => {
754 Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, Utc)))
755 },
756 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
757 Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, Utc)))
758 },
759 DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
760 let tz: Tz = tz.parse()?;
761 Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, tz)))
762 },
763 DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
764 let tz: Tz = tz.parse()?;
765 Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, tz)))
766 },
767 DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
768 let tz: Tz = tz.parse()?;
769 Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, tz)))
770 },
771 DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
772 let tz: Tz = tz.parse()?;
773 Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, tz)))
774 },
775 DataType::Date32 => primitive_decoder!(Date32Type, data_type),
776 DataType::Date64 => primitive_decoder!(Date64Type, data_type),
777 DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
778 DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type),
779 DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type),
780 DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type),
781 DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type),
782 DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type),
783 DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type),
784 DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type),
785 DataType::Decimal32(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal32Type>::new(p, s))),
786 DataType::Decimal64(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal64Type>::new(p, s))),
787 DataType::Decimal128(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal128Type>::new(p, s))),
788 DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal256Type>::new(p, s))),
789 DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
790 DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
791 DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))),
792 DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
793 DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
794 DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
795 DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(ctx, data_type, is_nullable)?)),
796 DataType::Binary => Ok(Box::new(BinaryArrayDecoder::<i32>::default())),
797 DataType::LargeBinary => Ok(Box::new(BinaryArrayDecoder::<i64>::default())),
798 DataType::FixedSizeBinary(len) => Ok(Box::new(FixedSizeBinaryArrayDecoder::new(len))),
799 DataType::BinaryView => Ok(Box::new(BinaryViewDecoder::default())),
800 DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(ctx, data_type, is_nullable)?)),
801 DataType::RunEndEncoded(ref r, _) => match r.data_type() {
802 DataType::Int16 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int16Type>::new(ctx, data_type, is_nullable)?)),
803 DataType::Int32 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int32Type>::new(ctx, data_type, is_nullable)?)),
804 DataType::Int64 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int64Type>::new(ctx, data_type, is_nullable)?)),
805 d => unreachable!("unsupported run end index type: {d}"),
806 },
807 _ => Err(ArrowError::NotYetImplemented(format!("Support for {data_type} in JSON reader")))
808 }
809}
810
811#[cfg(test)]
812mod tests {
813 use serde_json::json;
814 use std::fs::File;
815 use std::io::{BufReader, Cursor, Seek};
816
817 use arrow_array::cast::AsArray;
818 use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray, StringViewArray};
819 use arrow_buffer::{ArrowNativeType, Buffer};
820 use arrow_cast::display::{ArrayFormatter, FormatOptions};
821 use arrow_data::ArrayDataBuilder;
822 use arrow_schema::{Field, Fields};
823
824 use super::*;
825
826 fn do_read(
827 buf: &str,
828 batch_size: usize,
829 coerce_primitive: bool,
830 strict_mode: bool,
831 schema: SchemaRef,
832 ) -> Vec<RecordBatch> {
833 let mut unbuffered = vec![];
834
835 for batch_size in [1, 3, 100, batch_size] {
837 unbuffered = ReaderBuilder::new(schema.clone())
838 .with_batch_size(batch_size)
839 .with_coerce_primitive(coerce_primitive)
840 .build(Cursor::new(buf.as_bytes()))
841 .unwrap()
842 .collect::<Result<Vec<_>, _>>()
843 .unwrap();
844
845 for b in unbuffered.iter().take(unbuffered.len() - 1) {
846 assert_eq!(b.num_rows(), batch_size)
847 }
848
849 for b in [1, 3, 5] {
851 let buffered = ReaderBuilder::new(schema.clone())
852 .with_batch_size(batch_size)
853 .with_coerce_primitive(coerce_primitive)
854 .with_strict_mode(strict_mode)
855 .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
856 .unwrap()
857 .collect::<Result<Vec<_>, _>>()
858 .unwrap();
859 assert_eq!(unbuffered, buffered);
860 }
861 }
862
863 unbuffered
864 }
865
866 #[test]
867 fn test_basic() {
868 let buf = r#"
869 {"a": 1, "b": 2, "c": true, "d": 1}
870 {"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254}
871
872 {"b": 6, "a": 2.0, "d": 45}
873 {"b": "5", "a": 2}
874 {"b": 4e0}
875 {"b": 7, "a": null}
876 "#;
877
878 let schema = Arc::new(Schema::new(vec![
879 Field::new("a", DataType::Int64, true),
880 Field::new("b", DataType::Int32, true),
881 Field::new("c", DataType::Boolean, true),
882 Field::new("d", DataType::Date32, true),
883 Field::new("e", DataType::Date64, true),
884 ]));
885
886 let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
887 assert!(decoder.is_empty());
888 assert_eq!(decoder.len(), 0);
889 assert!(!decoder.has_partial_record());
890 assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
891 assert!(!decoder.is_empty());
892 assert_eq!(decoder.len(), 6);
893 assert!(!decoder.has_partial_record());
894 let batch = decoder.flush().unwrap().unwrap();
895 assert_eq!(batch.num_rows(), 6);
896 assert!(decoder.is_empty());
897 assert_eq!(decoder.len(), 0);
898 assert!(!decoder.has_partial_record());
899
900 let batches = do_read(buf, 1024, false, false, schema);
901 assert_eq!(batches.len(), 1);
902
903 let col1 = batches[0].column(0).as_primitive::<Int64Type>();
904 assert_eq!(col1.null_count(), 2);
905 assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
906 assert!(col1.is_null(4));
907 assert!(col1.is_null(5));
908
909 let col2 = batches[0].column(1).as_primitive::<Int32Type>();
910 assert_eq!(col2.null_count(), 0);
911 assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
912
913 let col3 = batches[0].column(2).as_boolean();
914 assert_eq!(col3.null_count(), 4);
915 assert!(col3.value(0));
916 assert!(!col3.is_null(0));
917 assert!(!col3.value(1));
918 assert!(!col3.is_null(1));
919
920 let col4 = batches[0].column(3).as_primitive::<Date32Type>();
921 assert_eq!(col4.null_count(), 3);
922 assert!(col4.is_null(3));
923 assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]);
924
925 let col5 = batches[0].column(4).as_primitive::<Date64Type>();
926 assert_eq!(col5.null_count(), 5);
927 assert!(col5.is_null(0));
928 assert!(col5.is_null(2));
929 assert!(col5.is_null(3));
930 assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]);
931 }
932
933 #[test]
934 fn test_string() {
935 let buf = r#"
936 {"a": "1", "b": "2"}
937 {"a": "hello", "b": "shoo"}
938 {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
939
940 {"b": null}
941 {"b": "", "a": null}
942
943 "#;
944 let schema = Arc::new(Schema::new(vec![
945 Field::new("a", DataType::Utf8, true),
946 Field::new("b", DataType::LargeUtf8, true),
947 ]));
948
949 let batches = do_read(buf, 1024, false, false, schema);
950 assert_eq!(batches.len(), 1);
951
952 let col1 = batches[0].column(0).as_string::<i32>();
953 assert_eq!(col1.null_count(), 2);
954 assert_eq!(col1.value(0), "1");
955 assert_eq!(col1.value(1), "hello");
956 assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
957 assert!(col1.is_null(3));
958 assert!(col1.is_null(4));
959
960 let col2 = batches[0].column(1).as_string::<i64>();
961 assert_eq!(col2.null_count(), 1);
962 assert_eq!(col2.value(0), "2");
963 assert_eq!(col2.value(1), "shoo");
964 assert_eq!(col2.value(2), "\t😁foo");
965 assert!(col2.is_null(3));
966 assert_eq!(col2.value(4), "");
967 }
968
969 #[test]
970 fn test_long_string_view_allocation() {
971 let expected_capacity: usize = 41;
981
982 let buf = r#"
983 {"a": "short", "b": "dummy"}
984 {"a": "this is definitely long", "b": "dummy"}
985 {"a": "hello", "b": "dummy"}
986 {"a": "\nfoobar😀asfgÿ", "b": "dummy"}
987 "#;
988
989 let schema = Arc::new(Schema::new(vec![
990 Field::new("a", DataType::Utf8View, true),
991 Field::new("b", DataType::LargeUtf8, true),
992 ]));
993
994 let batches = do_read(buf, 1024, false, false, schema);
995 assert_eq!(batches.len(), 1, "Expected one record batch");
996
997 let col_a = batches[0].column(0);
999 let string_view_array = col_a
1000 .as_any()
1001 .downcast_ref::<StringViewArray>()
1002 .expect("Column should be a StringViewArray");
1003
1004 let data_buffer = string_view_array.to_data().buffers()[0].len();
1007
1008 assert!(
1011 data_buffer >= expected_capacity,
1012 "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1013 );
1014
1015 assert_eq!(string_view_array.value(0), "short");
1017 assert_eq!(string_view_array.value(1), "this is definitely long");
1018 assert_eq!(string_view_array.value(2), "hello");
1019 assert_eq!(string_view_array.value(3), "\nfoobar😀asfgÿ");
1020 }
1021
1022 #[test]
1024 fn test_numeric_view_allocation() {
1025 let expected_capacity: usize = 33;
1033
1034 let buf = r#"
1035 {"n": 123456789}
1036 {"n": 1000000000000}
1037 {"n": 3.1415}
1038 {"n": 2.718281828459045}
1039 "#;
1040
1041 let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Utf8View, true)]));
1042
1043 let batches = do_read(buf, 1024, true, false, schema);
1044 assert_eq!(batches.len(), 1, "Expected one record batch");
1045
1046 let col_n = batches[0].column(0);
1047 let string_view_array = col_n
1048 .as_any()
1049 .downcast_ref::<StringViewArray>()
1050 .expect("Column should be a StringViewArray");
1051
1052 let data_buffer = string_view_array.to_data().buffers()[0].len();
1054 assert!(
1055 data_buffer >= expected_capacity,
1056 "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1057 );
1058
1059 assert_eq!(string_view_array.value(0), "123456789");
1062 assert_eq!(string_view_array.value(1), "1000000000000");
1063 assert_eq!(string_view_array.value(2), "3.1415");
1064 assert_eq!(string_view_array.value(3), "2.718281828459045");
1065 }
1066
1067 #[test]
1068 fn test_string_with_uft8view() {
1069 let buf = r#"
1070 {"a": "1", "b": "2"}
1071 {"a": "hello", "b": "shoo"}
1072 {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
1073
1074 {"b": null}
1075 {"b": "", "a": null}
1076
1077 "#;
1078 let schema = Arc::new(Schema::new(vec![
1079 Field::new("a", DataType::Utf8View, true),
1080 Field::new("b", DataType::LargeUtf8, true),
1081 ]));
1082
1083 let batches = do_read(buf, 1024, false, false, schema);
1084 assert_eq!(batches.len(), 1);
1085
1086 let col1 = batches[0].column(0).as_string_view();
1087 assert_eq!(col1.null_count(), 2);
1088 assert_eq!(col1.value(0), "1");
1089 assert_eq!(col1.value(1), "hello");
1090 assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1091 assert!(col1.is_null(3));
1092 assert!(col1.is_null(4));
1093 assert_eq!(col1.data_type(), &DataType::Utf8View);
1094
1095 let col2 = batches[0].column(1).as_string::<i64>();
1096 assert_eq!(col2.null_count(), 1);
1097 assert_eq!(col2.value(0), "2");
1098 assert_eq!(col2.value(1), "shoo");
1099 assert_eq!(col2.value(2), "\t😁foo");
1100 assert!(col2.is_null(3));
1101 assert_eq!(col2.value(4), "");
1102 }
1103
1104 #[test]
1105 fn test_complex() {
1106 let buf = r#"
1107 {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
1108 {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1109 {"list": null, "nested": {"a": null}}
1110 "#;
1111
1112 let schema = Arc::new(Schema::new(vec![
1113 Field::new_list("list", Field::new("element", DataType::Int32, false), true),
1114 Field::new_struct(
1115 "nested",
1116 vec![
1117 Field::new("a", DataType::Int32, true),
1118 Field::new("b", DataType::Int32, true),
1119 ],
1120 true,
1121 ),
1122 Field::new_struct(
1123 "nested_list",
1124 vec![Field::new_list(
1125 "list2",
1126 Field::new_struct(
1127 "element",
1128 vec![Field::new("c", DataType::Int32, false)],
1129 false,
1130 ),
1131 true,
1132 )],
1133 true,
1134 ),
1135 ]));
1136
1137 let batches = do_read(buf, 1024, false, false, schema);
1138 assert_eq!(batches.len(), 1);
1139
1140 let list = batches[0].column(0).as_list::<i32>();
1141 assert_eq!(list.len(), 3);
1142 assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
1143 assert_eq!(list.null_count(), 1);
1144 assert!(list.is_null(2));
1145 let list_values = list.values().as_primitive::<Int32Type>();
1146 assert_eq!(list_values.values(), &[5, 6]);
1147
1148 let nested = batches[0].column(1).as_struct();
1149 let a = nested.column(0).as_primitive::<Int32Type>();
1150 assert_eq!(list.null_count(), 1);
1151 assert_eq!(a.values(), &[1, 7, 0]);
1152 assert!(list.is_null(2));
1153
1154 let b = nested.column(1).as_primitive::<Int32Type>();
1155 assert_eq!(b.null_count(), 2);
1156 assert_eq!(b.len(), 3);
1157 assert_eq!(b.value(0), 2);
1158 assert!(b.is_null(1));
1159 assert!(b.is_null(2));
1160
1161 let nested_list = batches[0].column(2).as_struct();
1162 assert_eq!(nested_list.len(), 3);
1163 assert_eq!(nested_list.null_count(), 1);
1164 assert!(nested_list.is_null(2));
1165
1166 let list2 = nested_list.column(0).as_list::<i32>();
1167 assert_eq!(list2.len(), 3);
1168 assert_eq!(list2.null_count(), 1);
1169 assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
1170 assert!(list2.is_null(2));
1171
1172 let list2_values = list2.values().as_struct();
1173
1174 let c = list2_values.column(0).as_primitive::<Int32Type>();
1175 assert_eq!(c.values(), &[3, 4]);
1176 }
1177
1178 #[test]
1179 fn test_projection() {
1180 let buf = r#"
1181 {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
1182 {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1183 "#;
1184
1185 let schema = Arc::new(Schema::new(vec![
1186 Field::new_struct(
1187 "nested",
1188 vec![Field::new("a", DataType::Int32, false)],
1189 true,
1190 ),
1191 Field::new_struct(
1192 "nested_list",
1193 vec![Field::new_list(
1194 "list2",
1195 Field::new_struct(
1196 "element",
1197 vec![Field::new("d", DataType::Int32, true)],
1198 false,
1199 ),
1200 true,
1201 )],
1202 true,
1203 ),
1204 ]));
1205
1206 let batches = do_read(buf, 1024, false, false, schema);
1207 assert_eq!(batches.len(), 1);
1208
1209 let nested = batches[0].column(0).as_struct();
1210 assert_eq!(nested.num_columns(), 1);
1211 let a = nested.column(0).as_primitive::<Int32Type>();
1212 assert_eq!(a.null_count(), 0);
1213 assert_eq!(a.values(), &[1, 7]);
1214
1215 let nested_list = batches[0].column(1).as_struct();
1216 assert_eq!(nested_list.num_columns(), 1);
1217 assert_eq!(nested_list.null_count(), 0);
1218
1219 let list2 = nested_list.column(0).as_list::<i32>();
1220 assert_eq!(list2.value_offsets(), &[0, 2, 2]);
1221 assert_eq!(list2.null_count(), 0);
1222
1223 let child = list2.values().as_struct();
1224 assert_eq!(child.num_columns(), 1);
1225 assert_eq!(child.len(), 2);
1226 assert_eq!(child.null_count(), 0);
1227
1228 let c = child.column(0).as_primitive::<Int32Type>();
1229 assert_eq!(c.values(), &[5, 0]);
1230 assert_eq!(c.null_count(), 1);
1231 assert!(c.is_null(1));
1232 }
1233
1234 #[test]
1235 fn test_map() {
1236 let buf = r#"
1237 {"map": {"a": ["foo", null]}}
1238 {"map": {"a": [null], "b": []}}
1239 {"map": {"c": null, "a": ["baz"]}}
1240 "#;
1241 let map = Field::new_map(
1242 "map",
1243 "entries",
1244 Field::new("key", DataType::Utf8, false),
1245 Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
1246 false,
1247 true,
1248 );
1249
1250 let schema = Arc::new(Schema::new(vec![map]));
1251
1252 let batches = do_read(buf, 1024, false, false, schema);
1253 assert_eq!(batches.len(), 1);
1254
1255 let map = batches[0].column(0).as_map();
1256 let map_keys = map.keys().as_string::<i32>();
1257 let map_values = map.values().as_list::<i32>();
1258 assert_eq!(map.value_offsets(), &[0, 1, 3, 5]);
1259
1260 let k: Vec<_> = map_keys.iter().flatten().collect();
1261 assert_eq!(&k, &["a", "a", "b", "c", "a"]);
1262
1263 let list_values = map_values.values().as_string::<i32>();
1264 let lv: Vec<_> = list_values.iter().collect();
1265 assert_eq!(&lv, &[Some("foo"), None, None, Some("baz")]);
1266 assert_eq!(map_values.value_offsets(), &[0, 2, 3, 3, 3, 4]);
1267 assert_eq!(map_values.null_count(), 1);
1268 assert!(map_values.is_null(3));
1269
1270 let options = FormatOptions::default().with_null("null");
1271 let formatter = ArrayFormatter::try_new(map, &options).unwrap();
1272 assert_eq!(formatter.value(0).to_string(), "{a: [foo, null]}");
1273 assert_eq!(formatter.value(1).to_string(), "{a: [null], b: []}");
1274 assert_eq!(formatter.value(2).to_string(), "{c: null, a: [baz]}");
1275 }
1276
1277 #[test]
1278 fn test_not_coercing_primitive_into_string_without_flag() {
1279 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
1280
1281 let buf = r#"{"a": 1}"#;
1282 let err = ReaderBuilder::new(schema.clone())
1283 .with_batch_size(1024)
1284 .build(Cursor::new(buf.as_bytes()))
1285 .unwrap()
1286 .read()
1287 .unwrap_err();
1288
1289 assert_eq!(
1290 err.to_string(),
1291 "Json error: whilst decoding field 'a': expected string got 1"
1292 );
1293
1294 let buf = r#"{"a": true}"#;
1295 let err = ReaderBuilder::new(schema)
1296 .with_batch_size(1024)
1297 .build(Cursor::new(buf.as_bytes()))
1298 .unwrap()
1299 .read()
1300 .unwrap_err();
1301
1302 assert_eq!(
1303 err.to_string(),
1304 "Json error: whilst decoding field 'a': expected string got true"
1305 );
1306 }
1307
1308 #[test]
1309 fn test_coercing_primitive_into_string() {
1310 let buf = r#"
1311 {"a": 1, "b": 2, "c": true}
1312 {"a": 2E0, "b": 4, "c": false}
1313
1314 {"b": 6, "a": 2.0}
1315 {"b": "5", "a": 2}
1316 {"b": 4e0}
1317 {"b": 7, "a": null}
1318 "#;
1319
1320 let schema = Arc::new(Schema::new(vec![
1321 Field::new("a", DataType::Utf8, true),
1322 Field::new("b", DataType::Utf8, true),
1323 Field::new("c", DataType::Utf8, true),
1324 ]));
1325
1326 let batches = do_read(buf, 1024, true, false, schema);
1327 assert_eq!(batches.len(), 1);
1328
1329 let col1 = batches[0].column(0).as_string::<i32>();
1330 assert_eq!(col1.null_count(), 2);
1331 assert_eq!(col1.value(0), "1");
1332 assert_eq!(col1.value(1), "2E0");
1333 assert_eq!(col1.value(2), "2.0");
1334 assert_eq!(col1.value(3), "2");
1335 assert!(col1.is_null(4));
1336 assert!(col1.is_null(5));
1337
1338 let col2 = batches[0].column(1).as_string::<i32>();
1339 assert_eq!(col2.null_count(), 0);
1340 assert_eq!(col2.value(0), "2");
1341 assert_eq!(col2.value(1), "4");
1342 assert_eq!(col2.value(2), "6");
1343 assert_eq!(col2.value(3), "5");
1344 assert_eq!(col2.value(4), "4e0");
1345 assert_eq!(col2.value(5), "7");
1346
1347 let col3 = batches[0].column(2).as_string::<i32>();
1348 assert_eq!(col3.null_count(), 4);
1349 assert_eq!(col3.value(0), "true");
1350 assert_eq!(col3.value(1), "false");
1351 assert!(col3.is_null(2));
1352 assert!(col3.is_null(3));
1353 assert!(col3.is_null(4));
1354 assert!(col3.is_null(5));
1355 }
1356
1357 fn test_decimal<T: DecimalType>(data_type: DataType) {
1358 let buf = r#"
1359 {"a": 1, "b": 2, "c": 38.30}
1360 {"a": 2, "b": 4, "c": 123.456}
1361
1362 {"b": 1337, "a": "2.0452"}
1363 {"b": "5", "a": "11034.2"}
1364 {"b": 40}
1365 {"b": 1234, "a": null}
1366 "#;
1367
1368 let schema = Arc::new(Schema::new(vec![
1369 Field::new("a", data_type.clone(), true),
1370 Field::new("b", data_type.clone(), true),
1371 Field::new("c", data_type, true),
1372 ]));
1373
1374 let batches = do_read(buf, 1024, true, false, schema);
1375 assert_eq!(batches.len(), 1);
1376
1377 let col1 = batches[0].column(0).as_primitive::<T>();
1378 assert_eq!(col1.null_count(), 2);
1379 assert!(col1.is_null(4));
1380 assert!(col1.is_null(5));
1381 assert_eq!(
1382 col1.values(),
1383 &[100, 200, 204, 1103420, 0, 0].map(T::Native::usize_as)
1384 );
1385
1386 let col2 = batches[0].column(1).as_primitive::<T>();
1387 assert_eq!(col2.null_count(), 0);
1388 assert_eq!(
1389 col2.values(),
1390 &[200, 400, 133700, 500, 4000, 123400].map(T::Native::usize_as)
1391 );
1392
1393 let col3 = batches[0].column(2).as_primitive::<T>();
1394 assert_eq!(col3.null_count(), 4);
1395 assert!(!col3.is_null(0));
1396 assert!(!col3.is_null(1));
1397 assert!(col3.is_null(2));
1398 assert!(col3.is_null(3));
1399 assert!(col3.is_null(4));
1400 assert!(col3.is_null(5));
1401 assert_eq!(
1402 col3.values(),
1403 &[3830, 12345, 0, 0, 0, 0].map(T::Native::usize_as)
1404 );
1405 }
1406
1407 #[test]
1408 fn test_decimals() {
1409 test_decimal::<Decimal32Type>(DataType::Decimal32(8, 2));
1410 test_decimal::<Decimal64Type>(DataType::Decimal64(10, 2));
1411 test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
1412 test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
1413 }
1414
1415 fn test_timestamp<T: ArrowTimestampType>() {
1416 let buf = r#"
1417 {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"}
1418 {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456}
1419
1420 {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"}
1421 {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"}
1422 {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"}
1423 {"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"}
1424 "#;
1425
1426 let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".into()));
1427 let schema = Arc::new(Schema::new(vec![
1428 Field::new("a", T::DATA_TYPE, true),
1429 Field::new("b", T::DATA_TYPE, true),
1430 Field::new("c", T::DATA_TYPE, true),
1431 Field::new("d", with_timezone, true),
1432 ]));
1433
1434 let batches = do_read(buf, 1024, true, false, schema);
1435 assert_eq!(batches.len(), 1);
1436
1437 let unit_in_nanos: i64 = match T::UNIT {
1438 TimeUnit::Second => 1_000_000_000,
1439 TimeUnit::Millisecond => 1_000_000,
1440 TimeUnit::Microsecond => 1_000,
1441 TimeUnit::Nanosecond => 1,
1442 };
1443
1444 let col1 = batches[0].column(0).as_primitive::<T>();
1445 assert_eq!(col1.null_count(), 4);
1446 assert!(col1.is_null(2));
1447 assert!(col1.is_null(3));
1448 assert!(col1.is_null(4));
1449 assert!(col1.is_null(5));
1450 assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1451
1452 let col2 = batches[0].column(1).as_primitive::<T>();
1453 assert_eq!(col2.null_count(), 1);
1454 assert!(col2.is_null(5));
1455 assert_eq!(
1456 col2.values(),
1457 &[
1458 1599572549190855000 / unit_in_nanos,
1459 1599572549190855000 / unit_in_nanos,
1460 1599572549000000000 / unit_in_nanos,
1461 40,
1462 1234,
1463 0
1464 ]
1465 );
1466
1467 let col3 = batches[0].column(2).as_primitive::<T>();
1468 assert_eq!(col3.null_count(), 0);
1469 assert_eq!(
1470 col3.values(),
1471 &[
1472 38,
1473 123,
1474 854702816123000000 / unit_in_nanos,
1475 1599572549190855000 / unit_in_nanos,
1476 854702816123000000 / unit_in_nanos,
1477 854738816123000000 / unit_in_nanos
1478 ]
1479 );
1480
1481 let col4 = batches[0].column(3).as_primitive::<T>();
1482
1483 assert_eq!(col4.null_count(), 0);
1484 assert_eq!(
1485 col4.values(),
1486 &[
1487 854674016123000000 / unit_in_nanos,
1488 123,
1489 854702816123000000 / unit_in_nanos,
1490 854720816123000000 / unit_in_nanos,
1491 854674016000000000 / unit_in_nanos,
1492 854640000000000000 / unit_in_nanos
1493 ]
1494 );
1495 }
1496
1497 #[test]
1498 fn test_timestamps() {
1499 test_timestamp::<TimestampSecondType>();
1500 test_timestamp::<TimestampMillisecondType>();
1501 test_timestamp::<TimestampMicrosecondType>();
1502 test_timestamp::<TimestampNanosecondType>();
1503 }
1504
1505 fn test_time<T: ArrowTemporalType>() {
1506 let buf = r#"
1507 {"a": 1, "b": "09:26:56.123 AM", "c": 38.30}
1508 {"a": 2, "b": "23:59:59", "c": 123.456}
1509
1510 {"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"}
1511 {"b": 40, "c": "13:42:29.190855"}
1512 {"b": 1234, "a": null, "c": "09:26:56.123"}
1513 {"c": "14:26:56.123"}
1514 "#;
1515
1516 let unit = match T::DATA_TYPE {
1517 DataType::Time32(unit) | DataType::Time64(unit) => unit,
1518 _ => unreachable!(),
1519 };
1520
1521 let unit_in_nanos = match unit {
1522 TimeUnit::Second => 1_000_000_000,
1523 TimeUnit::Millisecond => 1_000_000,
1524 TimeUnit::Microsecond => 1_000,
1525 TimeUnit::Nanosecond => 1,
1526 };
1527
1528 let schema = Arc::new(Schema::new(vec![
1529 Field::new("a", T::DATA_TYPE, true),
1530 Field::new("b", T::DATA_TYPE, true),
1531 Field::new("c", T::DATA_TYPE, true),
1532 ]));
1533
1534 let batches = do_read(buf, 1024, true, false, schema);
1535 assert_eq!(batches.len(), 1);
1536
1537 let col1 = batches[0].column(0).as_primitive::<T>();
1538 assert_eq!(col1.null_count(), 4);
1539 assert!(col1.is_null(2));
1540 assert!(col1.is_null(3));
1541 assert!(col1.is_null(4));
1542 assert!(col1.is_null(5));
1543 assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1544
1545 let col2 = batches[0].column(1).as_primitive::<T>();
1546 assert_eq!(col2.null_count(), 1);
1547 assert!(col2.is_null(5));
1548 assert_eq!(
1549 col2.values(),
1550 &[
1551 34016123000000 / unit_in_nanos,
1552 86399000000000 / unit_in_nanos,
1553 64800000000000 / unit_in_nanos,
1554 40,
1555 1234,
1556 0
1557 ]
1558 .map(T::Native::usize_as)
1559 );
1560
1561 let col3 = batches[0].column(2).as_primitive::<T>();
1562 assert_eq!(col3.null_count(), 0);
1563 assert_eq!(
1564 col3.values(),
1565 &[
1566 38,
1567 123,
1568 34016123000000 / unit_in_nanos,
1569 49349190855000 / unit_in_nanos,
1570 34016123000000 / unit_in_nanos,
1571 52016123000000 / unit_in_nanos
1572 ]
1573 .map(T::Native::usize_as)
1574 );
1575 }
1576
1577 #[test]
1578 fn test_times() {
1579 test_time::<Time32MillisecondType>();
1580 test_time::<Time32SecondType>();
1581 test_time::<Time64MicrosecondType>();
1582 test_time::<Time64NanosecondType>();
1583 }
1584
1585 fn test_duration<T: ArrowTemporalType>() {
1586 let buf = r#"
1587 {"a": 1, "b": "2"}
1588 {"a": 3, "b": null}
1589 "#;
1590
1591 let schema = Arc::new(Schema::new(vec![
1592 Field::new("a", T::DATA_TYPE, true),
1593 Field::new("b", T::DATA_TYPE, true),
1594 ]));
1595
1596 let batches = do_read(buf, 1024, true, false, schema);
1597 assert_eq!(batches.len(), 1);
1598
1599 let col_a = batches[0].column_by_name("a").unwrap().as_primitive::<T>();
1600 assert_eq!(col_a.null_count(), 0);
1601 assert_eq!(col_a.values(), &[1, 3].map(T::Native::usize_as));
1602
1603 let col2 = batches[0].column_by_name("b").unwrap().as_primitive::<T>();
1604 assert_eq!(col2.null_count(), 1);
1605 assert_eq!(col2.values(), &[2, 0].map(T::Native::usize_as));
1606 }
1607
1608 #[test]
1609 fn test_durations() {
1610 test_duration::<DurationNanosecondType>();
1611 test_duration::<DurationMicrosecondType>();
1612 test_duration::<DurationMillisecondType>();
1613 test_duration::<DurationSecondType>();
1614 }
1615
1616 #[test]
1617 fn test_delta_checkpoint() {
1618 let json = "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}";
1619 let schema = Arc::new(Schema::new(vec![
1620 Field::new_struct(
1621 "protocol",
1622 vec![
1623 Field::new("minReaderVersion", DataType::Int32, true),
1624 Field::new("minWriterVersion", DataType::Int32, true),
1625 ],
1626 true,
1627 ),
1628 Field::new_struct(
1629 "add",
1630 vec![Field::new_map(
1631 "partitionValues",
1632 "key_value",
1633 Field::new("key", DataType::Utf8, false),
1634 Field::new("value", DataType::Utf8, true),
1635 false,
1636 false,
1637 )],
1638 true,
1639 ),
1640 ]));
1641
1642 let batches = do_read(json, 1024, true, false, schema);
1643 assert_eq!(batches.len(), 1);
1644
1645 let s: StructArray = batches.into_iter().next().unwrap().into();
1646 let opts = FormatOptions::default().with_null("null");
1647 let formatter = ArrayFormatter::try_new(&s, &opts).unwrap();
1648 assert_eq!(
1649 formatter.value(0).to_string(),
1650 "{protocol: {minReaderVersion: 1, minWriterVersion: 2}, add: null}"
1651 );
1652 }
1653
1654 #[test]
1655 fn struct_nullability() {
1656 let do_test = |child: DataType| {
1657 let non_null = r#"{"foo": {}}"#;
1659 let schema = Arc::new(Schema::new(vec![Field::new_struct(
1660 "foo",
1661 vec![Field::new("bar", child, false)],
1662 true,
1663 )]));
1664 let mut reader = ReaderBuilder::new(schema.clone())
1665 .build(Cursor::new(non_null.as_bytes()))
1666 .unwrap();
1667 assert!(reader.next().unwrap().is_err()); let null = r#"{"foo": {bar: null}}"#;
1670 let mut reader = ReaderBuilder::new(schema.clone())
1671 .build(Cursor::new(null.as_bytes()))
1672 .unwrap();
1673 assert!(reader.next().unwrap().is_err()); let null = r#"{"foo": null}"#;
1677 let mut reader = ReaderBuilder::new(schema)
1678 .build(Cursor::new(null.as_bytes()))
1679 .unwrap();
1680 let batch = reader.next().unwrap().unwrap();
1681 assert_eq!(batch.num_columns(), 1);
1682 let foo = batch.column(0).as_struct();
1683 assert_eq!(foo.len(), 1);
1684 assert!(foo.is_null(0));
1685 assert_eq!(foo.num_columns(), 1);
1686
1687 let bar = foo.column(0);
1688 assert_eq!(bar.len(), 1);
1689 assert!(bar.is_null(0));
1691 };
1692
1693 do_test(DataType::Boolean);
1694 do_test(DataType::Int32);
1695 do_test(DataType::Utf8);
1696 do_test(DataType::Decimal128(2, 1));
1697 do_test(DataType::Timestamp(
1698 TimeUnit::Microsecond,
1699 Some("+00:00".into()),
1700 ));
1701 }
1702
1703 #[test]
1704 fn test_truncation() {
1705 let buf = r#"
1706 {"i64": 9223372036854775807, "u64": 18446744073709551615 }
1707 {"i64": "9223372036854775807", "u64": "18446744073709551615" }
1708 {"i64": -9223372036854775808, "u64": 0 }
1709 {"i64": "-9223372036854775808", "u64": 0 }
1710 "#;
1711
1712 let schema = Arc::new(Schema::new(vec![
1713 Field::new("i64", DataType::Int64, true),
1714 Field::new("u64", DataType::UInt64, true),
1715 ]));
1716
1717 let batches = do_read(buf, 1024, true, false, schema);
1718 assert_eq!(batches.len(), 1);
1719
1720 let i64 = batches[0].column(0).as_primitive::<Int64Type>();
1721 assert_eq!(i64.values(), &[i64::MAX, i64::MAX, i64::MIN, i64::MIN]);
1722
1723 let u64 = batches[0].column(1).as_primitive::<UInt64Type>();
1724 assert_eq!(u64.values(), &[u64::MAX, u64::MAX, u64::MIN, u64::MIN]);
1725 }
1726
1727 #[test]
1728 fn test_timestamp_truncation() {
1729 let buf = r#"
1730 {"time": 9223372036854775807 }
1731 {"time": -9223372036854775808 }
1732 {"time": 9e5 }
1733 "#;
1734
1735 let schema = Arc::new(Schema::new(vec![Field::new(
1736 "time",
1737 DataType::Timestamp(TimeUnit::Nanosecond, None),
1738 true,
1739 )]));
1740
1741 let batches = do_read(buf, 1024, true, false, schema);
1742 assert_eq!(batches.len(), 1);
1743
1744 let i64 = batches[0]
1745 .column(0)
1746 .as_primitive::<TimestampNanosecondType>();
1747 assert_eq!(i64.values(), &[i64::MAX, i64::MIN, 900000]);
1748 }
1749
1750 #[test]
1751 fn test_strict_mode_no_missing_columns_in_schema() {
1752 let buf = r#"
1753 {"a": 1, "b": "2", "c": true}
1754 {"a": 2E0, "b": "4", "c": false}
1755 "#;
1756
1757 let schema = Arc::new(Schema::new(vec![
1758 Field::new("a", DataType::Int16, false),
1759 Field::new("b", DataType::Utf8, false),
1760 Field::new("c", DataType::Boolean, false),
1761 ]));
1762
1763 let batches = do_read(buf, 1024, true, true, schema);
1764 assert_eq!(batches.len(), 1);
1765
1766 let buf = r#"
1767 {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1768 {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1769 "#;
1770
1771 let schema = Arc::new(Schema::new(vec![
1772 Field::new("a", DataType::Int16, false),
1773 Field::new("b", DataType::Utf8, false),
1774 Field::new_struct(
1775 "c",
1776 vec![
1777 Field::new("a", DataType::Boolean, false),
1778 Field::new("b", DataType::Int16, false),
1779 ],
1780 false,
1781 ),
1782 ]));
1783
1784 let batches = do_read(buf, 1024, true, true, schema);
1785 assert_eq!(batches.len(), 1);
1786 }
1787
1788 #[test]
1789 fn test_strict_mode_missing_columns_in_schema() {
1790 let buf = r#"
1791 {"a": 1, "b": "2", "c": true}
1792 {"a": 2E0, "b": "4", "c": false}
1793 "#;
1794
1795 let schema = Arc::new(Schema::new(vec![
1796 Field::new("a", DataType::Int16, true),
1797 Field::new("c", DataType::Boolean, true),
1798 ]));
1799
1800 let err = ReaderBuilder::new(schema)
1801 .with_batch_size(1024)
1802 .with_strict_mode(true)
1803 .build(Cursor::new(buf.as_bytes()))
1804 .unwrap()
1805 .read()
1806 .unwrap_err();
1807
1808 assert_eq!(
1809 err.to_string(),
1810 "Json error: column 'b' missing from schema"
1811 );
1812
1813 let buf = r#"
1814 {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1815 {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1816 "#;
1817
1818 let schema = Arc::new(Schema::new(vec![
1819 Field::new("a", DataType::Int16, false),
1820 Field::new("b", DataType::Utf8, false),
1821 Field::new_struct("c", vec![Field::new("a", DataType::Boolean, false)], false),
1822 ]));
1823
1824 let err = ReaderBuilder::new(schema)
1825 .with_batch_size(1024)
1826 .with_strict_mode(true)
1827 .build(Cursor::new(buf.as_bytes()))
1828 .unwrap()
1829 .read()
1830 .unwrap_err();
1831
1832 assert_eq!(
1833 err.to_string(),
1834 "Json error: whilst decoding field 'c': column 'b' missing from schema"
1835 );
1836 }
1837
1838 fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
1839 let file = File::open(path).unwrap();
1840 let mut reader = BufReader::new(file);
1841 let schema = schema.unwrap_or_else(|| {
1842 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1843 reader.rewind().unwrap();
1844 schema
1845 });
1846 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1847 builder.build(reader).unwrap()
1848 }
1849
1850 #[test]
1851 fn test_json_basic() {
1852 let mut reader = read_file("test/data/basic.json", None);
1853 let batch = reader.next().unwrap().unwrap();
1854
1855 assert_eq!(8, batch.num_columns());
1856 assert_eq!(12, batch.num_rows());
1857
1858 let schema = reader.schema();
1859 let batch_schema = batch.schema();
1860 assert_eq!(schema, batch_schema);
1861
1862 let a = schema.column_with_name("a").unwrap();
1863 assert_eq!(0, a.0);
1864 assert_eq!(&DataType::Int64, a.1.data_type());
1865 let b = schema.column_with_name("b").unwrap();
1866 assert_eq!(1, b.0);
1867 assert_eq!(&DataType::Float64, b.1.data_type());
1868 let c = schema.column_with_name("c").unwrap();
1869 assert_eq!(2, c.0);
1870 assert_eq!(&DataType::Boolean, c.1.data_type());
1871 let d = schema.column_with_name("d").unwrap();
1872 assert_eq!(3, d.0);
1873 assert_eq!(&DataType::Utf8, d.1.data_type());
1874
1875 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1876 assert_eq!(1, aa.value(0));
1877 assert_eq!(-10, aa.value(1));
1878 let bb = batch.column(b.0).as_primitive::<Float64Type>();
1879 assert_eq!(2.0, bb.value(0));
1880 assert_eq!(-3.5, bb.value(1));
1881 let cc = batch.column(c.0).as_boolean();
1882 assert!(!cc.value(0));
1883 assert!(cc.value(10));
1884 let dd = batch.column(d.0).as_string::<i32>();
1885 assert_eq!("4", dd.value(0));
1886 assert_eq!("text", dd.value(8));
1887 }
1888
1889 #[test]
1890 fn test_json_empty_projection() {
1891 let mut reader = read_file("test/data/basic.json", Some(Schema::empty()));
1892 let batch = reader.next().unwrap().unwrap();
1893
1894 assert_eq!(0, batch.num_columns());
1895 assert_eq!(12, batch.num_rows());
1896 }
1897
1898 #[test]
1899 fn test_json_basic_with_nulls() {
1900 let mut reader = read_file("test/data/basic_nulls.json", None);
1901 let batch = reader.next().unwrap().unwrap();
1902
1903 assert_eq!(4, batch.num_columns());
1904 assert_eq!(12, batch.num_rows());
1905
1906 let schema = reader.schema();
1907 let batch_schema = batch.schema();
1908 assert_eq!(schema, batch_schema);
1909
1910 let a = schema.column_with_name("a").unwrap();
1911 assert_eq!(&DataType::Int64, a.1.data_type());
1912 let b = schema.column_with_name("b").unwrap();
1913 assert_eq!(&DataType::Float64, b.1.data_type());
1914 let c = schema.column_with_name("c").unwrap();
1915 assert_eq!(&DataType::Boolean, c.1.data_type());
1916 let d = schema.column_with_name("d").unwrap();
1917 assert_eq!(&DataType::Utf8, d.1.data_type());
1918
1919 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1920 assert!(aa.is_valid(0));
1921 assert!(!aa.is_valid(1));
1922 assert!(!aa.is_valid(11));
1923 let bb = batch.column(b.0).as_primitive::<Float64Type>();
1924 assert!(bb.is_valid(0));
1925 assert!(!bb.is_valid(2));
1926 assert!(!bb.is_valid(11));
1927 let cc = batch.column(c.0).as_boolean();
1928 assert!(cc.is_valid(0));
1929 assert!(!cc.is_valid(4));
1930 assert!(!cc.is_valid(11));
1931 let dd = batch.column(d.0).as_string::<i32>();
1932 assert!(!dd.is_valid(0));
1933 assert!(dd.is_valid(1));
1934 assert!(!dd.is_valid(4));
1935 assert!(!dd.is_valid(11));
1936 }
1937
1938 #[test]
1939 fn test_json_basic_schema() {
1940 let schema = Schema::new(vec![
1941 Field::new("a", DataType::Int64, true),
1942 Field::new("b", DataType::Float32, false),
1943 Field::new("c", DataType::Boolean, false),
1944 Field::new("d", DataType::Utf8, false),
1945 ]);
1946
1947 let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1948 let reader_schema = reader.schema();
1949 assert_eq!(reader_schema.as_ref(), &schema);
1950 let batch = reader.next().unwrap().unwrap();
1951
1952 assert_eq!(4, batch.num_columns());
1953 assert_eq!(12, batch.num_rows());
1954
1955 let schema = batch.schema();
1956
1957 let a = schema.column_with_name("a").unwrap();
1958 assert_eq!(&DataType::Int64, a.1.data_type());
1959 let b = schema.column_with_name("b").unwrap();
1960 assert_eq!(&DataType::Float32, b.1.data_type());
1961 let c = schema.column_with_name("c").unwrap();
1962 assert_eq!(&DataType::Boolean, c.1.data_type());
1963 let d = schema.column_with_name("d").unwrap();
1964 assert_eq!(&DataType::Utf8, d.1.data_type());
1965
1966 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1967 assert_eq!(1, aa.value(0));
1968 assert_eq!(100000000000000, aa.value(11));
1969 let bb = batch.column(b.0).as_primitive::<Float32Type>();
1970 assert_eq!(2.0, bb.value(0));
1971 assert_eq!(-3.5, bb.value(1));
1972 }
1973
1974 #[test]
1975 fn test_json_basic_schema_projection() {
1976 let schema = Schema::new(vec![
1977 Field::new("a", DataType::Int64, true),
1978 Field::new("c", DataType::Boolean, false),
1979 ]);
1980
1981 let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1982 let batch = reader.next().unwrap().unwrap();
1983
1984 assert_eq!(2, batch.num_columns());
1985 assert_eq!(2, batch.schema().fields().len());
1986 assert_eq!(12, batch.num_rows());
1987
1988 assert_eq!(batch.schema().as_ref(), &schema);
1989
1990 let a = schema.column_with_name("a").unwrap();
1991 assert_eq!(0, a.0);
1992 assert_eq!(&DataType::Int64, a.1.data_type());
1993 let c = schema.column_with_name("c").unwrap();
1994 assert_eq!(1, c.0);
1995 assert_eq!(&DataType::Boolean, c.1.data_type());
1996 }
1997
1998 #[test]
1999 fn test_json_arrays() {
2000 let mut reader = read_file("test/data/arrays.json", None);
2001 let batch = reader.next().unwrap().unwrap();
2002
2003 assert_eq!(4, batch.num_columns());
2004 assert_eq!(3, batch.num_rows());
2005
2006 let schema = batch.schema();
2007
2008 let a = schema.column_with_name("a").unwrap();
2009 assert_eq!(&DataType::Int64, a.1.data_type());
2010 let b = schema.column_with_name("b").unwrap();
2011 assert_eq!(
2012 &DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))),
2013 b.1.data_type()
2014 );
2015 let c = schema.column_with_name("c").unwrap();
2016 assert_eq!(
2017 &DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
2018 c.1.data_type()
2019 );
2020 let d = schema.column_with_name("d").unwrap();
2021 assert_eq!(&DataType::Utf8, d.1.data_type());
2022
2023 let aa = batch.column(a.0).as_primitive::<Int64Type>();
2024 assert_eq!(1, aa.value(0));
2025 assert_eq!(-10, aa.value(1));
2026 assert_eq!(1627668684594000000, aa.value(2));
2027 let bb = batch.column(b.0).as_list::<i32>();
2028 let bb = bb.values().as_primitive::<Float64Type>();
2029 assert_eq!(9, bb.len());
2030 assert_eq!(2.0, bb.value(0));
2031 assert_eq!(-6.1, bb.value(5));
2032 assert!(!bb.is_valid(7));
2033
2034 let cc = batch
2035 .column(c.0)
2036 .as_any()
2037 .downcast_ref::<ListArray>()
2038 .unwrap();
2039 let cc = cc.values().as_boolean();
2040 assert_eq!(6, cc.len());
2041 assert!(!cc.value(0));
2042 assert!(!cc.value(4));
2043 assert!(!cc.is_valid(5));
2044 }
2045
2046 #[test]
2047 fn test_empty_json_arrays() {
2048 let json_content = r#"
2049 {"items": []}
2050 {"items": null}
2051 {}
2052 "#;
2053
2054 let schema = Arc::new(Schema::new(vec![Field::new(
2055 "items",
2056 DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2057 true,
2058 )]));
2059
2060 let batches = do_read(json_content, 1024, false, false, schema);
2061 assert_eq!(batches.len(), 1);
2062
2063 let col1 = batches[0].column(0).as_list::<i32>();
2064 assert_eq!(col1.null_count(), 2);
2065 assert!(col1.value(0).is_empty());
2066 assert_eq!(col1.value(0).data_type(), &DataType::Null);
2067 assert!(col1.is_null(1));
2068 assert!(col1.is_null(2));
2069 }
2070
2071 #[test]
2072 fn test_nested_empty_json_arrays() {
2073 let json_content = r#"
2074 {"items": [[],[]]}
2075 {"items": [[null, null],[null]]}
2076 "#;
2077
2078 let schema = Arc::new(Schema::new(vec![Field::new(
2079 "items",
2080 DataType::List(FieldRef::new(Field::new_list_field(
2081 DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2082 true,
2083 ))),
2084 true,
2085 )]));
2086
2087 let batches = do_read(json_content, 1024, false, false, schema);
2088 assert_eq!(batches.len(), 1);
2089
2090 let col1 = batches[0].column(0).as_list::<i32>();
2091 assert_eq!(col1.null_count(), 0);
2092 assert_eq!(col1.value(0).len(), 2);
2093 assert!(col1.value(0).as_list::<i32>().value(0).is_empty());
2094 assert!(col1.value(0).as_list::<i32>().value(1).is_empty());
2095
2096 assert_eq!(col1.value(1).len(), 2);
2097 assert_eq!(col1.value(1).as_list::<i32>().value(0).len(), 2);
2098 assert_eq!(col1.value(1).as_list::<i32>().value(1).len(), 1);
2099 }
2100
2101 #[test]
2102 fn test_nested_list_json_arrays() {
2103 let c_field = Field::new_struct("c", vec![Field::new("d", DataType::Utf8, true)], true);
2104 let a_struct_field = Field::new_struct(
2105 "a",
2106 vec![Field::new("b", DataType::Boolean, true), c_field.clone()],
2107 true,
2108 );
2109 let a_field = Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
2110 let schema = Arc::new(Schema::new(vec![a_field.clone()]));
2111 let builder = ReaderBuilder::new(schema).with_batch_size(64);
2112 let json_content = r#"
2113 {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
2114 {"a": [{"b": false, "c": null}]}
2115 {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
2116 {"a": null}
2117 {"a": []}
2118 {"a": [null]}
2119 "#;
2120 let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2121
2122 let d = StringArray::from(vec![
2124 Some("a_text"),
2125 Some("b_text"),
2126 None,
2127 Some("c_text"),
2128 Some("d_text"),
2129 None,
2130 None,
2131 ]);
2132 let c = ArrayDataBuilder::new(c_field.data_type().clone())
2133 .len(7)
2134 .add_child_data(d.to_data())
2135 .null_bit_buffer(Some(Buffer::from([0b00111011])))
2136 .build()
2137 .unwrap();
2138 let b = BooleanArray::from(vec![
2139 Some(true),
2140 Some(false),
2141 Some(false),
2142 Some(true),
2143 None,
2144 Some(true),
2145 None,
2146 ]);
2147 let a = ArrayDataBuilder::new(a_struct_field.data_type().clone())
2148 .len(7)
2149 .add_child_data(b.to_data())
2150 .add_child_data(c.clone())
2151 .null_bit_buffer(Some(Buffer::from([0b00111111])))
2152 .build()
2153 .unwrap();
2154 let a_list = ArrayDataBuilder::new(a_field.data_type().clone())
2155 .len(6)
2156 .add_buffer(Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7]))
2157 .add_child_data(a)
2158 .null_bit_buffer(Some(Buffer::from([0b00110111])))
2159 .build()
2160 .unwrap();
2161 let expected = make_array(a_list);
2162
2163 let batch = reader.next().unwrap().unwrap();
2165 let read = batch.column(0);
2166 assert_eq!(read.len(), 6);
2167 let read: &ListArray = read.as_list::<i32>();
2169 let expected = expected.as_list::<i32>();
2170 assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
2171 assert_eq!(read.nulls(), expected.nulls());
2173 let struct_array = read.values().as_struct();
2175 let expected_struct_array = expected.values().as_struct();
2176
2177 assert_eq!(7, struct_array.len());
2178 assert_eq!(1, struct_array.null_count());
2179 assert_eq!(7, expected_struct_array.len());
2180 assert_eq!(1, expected_struct_array.null_count());
2181 assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
2183 let read_b = struct_array.column(0);
2185 assert_eq!(read_b.as_ref(), &b);
2186 let read_c = struct_array.column(1);
2187 assert_eq!(read_c.to_data(), c);
2188 let read_c = read_c.as_struct();
2189 let read_d = read_c.column(0);
2190 assert_eq!(read_d.as_ref(), &d);
2191
2192 assert_eq!(read, expected);
2193 }
2194
2195 #[test]
2196 fn test_skip_empty_lines() {
2197 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2198 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
2199 let json_content = "
2200 {\"a\": 1}
2201 {\"a\": 2}
2202 {\"a\": 3}";
2203 let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2204 let batch = reader.next().unwrap().unwrap();
2205
2206 assert_eq!(1, batch.num_columns());
2207 assert_eq!(3, batch.num_rows());
2208
2209 let schema = reader.schema();
2210 let c = schema.column_with_name("a").unwrap();
2211 assert_eq!(&DataType::Int64, c.1.data_type());
2212 }
2213
2214 #[test]
2215 fn test_with_multiple_batches() {
2216 let file = File::open("test/data/basic_nulls.json").unwrap();
2217 let mut reader = BufReader::new(file);
2218 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2219 reader.rewind().unwrap();
2220
2221 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2222 let mut reader = builder.build(reader).unwrap();
2223
2224 let mut num_records = Vec::new();
2225 while let Some(rb) = reader.next().transpose().unwrap() {
2226 num_records.push(rb.num_rows());
2227 }
2228
2229 assert_eq!(vec![5, 5, 2], num_records);
2230 }
2231
2232 #[test]
2233 fn test_timestamp_from_json_seconds() {
2234 let schema = Schema::new(vec![Field::new(
2235 "a",
2236 DataType::Timestamp(TimeUnit::Second, None),
2237 true,
2238 )]);
2239
2240 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2241 let batch = reader.next().unwrap().unwrap();
2242
2243 assert_eq!(1, batch.num_columns());
2244 assert_eq!(12, batch.num_rows());
2245
2246 let schema = reader.schema();
2247 let batch_schema = batch.schema();
2248 assert_eq!(schema, batch_schema);
2249
2250 let a = schema.column_with_name("a").unwrap();
2251 assert_eq!(
2252 &DataType::Timestamp(TimeUnit::Second, None),
2253 a.1.data_type()
2254 );
2255
2256 let aa = batch.column(a.0).as_primitive::<TimestampSecondType>();
2257 assert!(aa.is_valid(0));
2258 assert!(!aa.is_valid(1));
2259 assert!(!aa.is_valid(2));
2260 assert_eq!(1, aa.value(0));
2261 assert_eq!(1, aa.value(3));
2262 assert_eq!(5, aa.value(7));
2263 }
2264
2265 #[test]
2266 fn test_timestamp_from_json_milliseconds() {
2267 let schema = Schema::new(vec![Field::new(
2268 "a",
2269 DataType::Timestamp(TimeUnit::Millisecond, None),
2270 true,
2271 )]);
2272
2273 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2274 let batch = reader.next().unwrap().unwrap();
2275
2276 assert_eq!(1, batch.num_columns());
2277 assert_eq!(12, batch.num_rows());
2278
2279 let schema = reader.schema();
2280 let batch_schema = batch.schema();
2281 assert_eq!(schema, batch_schema);
2282
2283 let a = schema.column_with_name("a").unwrap();
2284 assert_eq!(
2285 &DataType::Timestamp(TimeUnit::Millisecond, None),
2286 a.1.data_type()
2287 );
2288
2289 let aa = batch.column(a.0).as_primitive::<TimestampMillisecondType>();
2290 assert!(aa.is_valid(0));
2291 assert!(!aa.is_valid(1));
2292 assert!(!aa.is_valid(2));
2293 assert_eq!(1, aa.value(0));
2294 assert_eq!(1, aa.value(3));
2295 assert_eq!(5, aa.value(7));
2296 }
2297
2298 #[test]
2299 fn test_date_from_json_milliseconds() {
2300 let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
2301
2302 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2303 let batch = reader.next().unwrap().unwrap();
2304
2305 assert_eq!(1, batch.num_columns());
2306 assert_eq!(12, batch.num_rows());
2307
2308 let schema = reader.schema();
2309 let batch_schema = batch.schema();
2310 assert_eq!(schema, batch_schema);
2311
2312 let a = schema.column_with_name("a").unwrap();
2313 assert_eq!(&DataType::Date64, a.1.data_type());
2314
2315 let aa = batch.column(a.0).as_primitive::<Date64Type>();
2316 assert!(aa.is_valid(0));
2317 assert!(!aa.is_valid(1));
2318 assert!(!aa.is_valid(2));
2319 assert_eq!(1, aa.value(0));
2320 assert_eq!(1, aa.value(3));
2321 assert_eq!(5, aa.value(7));
2322 }
2323
2324 #[test]
2325 fn test_time_from_json_nanoseconds() {
2326 let schema = Schema::new(vec![Field::new(
2327 "a",
2328 DataType::Time64(TimeUnit::Nanosecond),
2329 true,
2330 )]);
2331
2332 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2333 let batch = reader.next().unwrap().unwrap();
2334
2335 assert_eq!(1, batch.num_columns());
2336 assert_eq!(12, batch.num_rows());
2337
2338 let schema = reader.schema();
2339 let batch_schema = batch.schema();
2340 assert_eq!(schema, batch_schema);
2341
2342 let a = schema.column_with_name("a").unwrap();
2343 assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
2344
2345 let aa = batch.column(a.0).as_primitive::<Time64NanosecondType>();
2346 assert!(aa.is_valid(0));
2347 assert!(!aa.is_valid(1));
2348 assert!(!aa.is_valid(2));
2349 assert_eq!(1, aa.value(0));
2350 assert_eq!(1, aa.value(3));
2351 assert_eq!(5, aa.value(7));
2352 }
2353
2354 #[test]
2355 fn test_json_iterator() {
2356 let file = File::open("test/data/basic.json").unwrap();
2357 let mut reader = BufReader::new(file);
2358 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2359 reader.rewind().unwrap();
2360
2361 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2362 let reader = builder.build(reader).unwrap();
2363 let schema = reader.schema();
2364 let (col_a_index, _) = schema.column_with_name("a").unwrap();
2365
2366 let mut sum_num_rows = 0;
2367 let mut num_batches = 0;
2368 let mut sum_a = 0;
2369 for batch in reader {
2370 let batch = batch.unwrap();
2371 assert_eq!(8, batch.num_columns());
2372 sum_num_rows += batch.num_rows();
2373 num_batches += 1;
2374 let batch_schema = batch.schema();
2375 assert_eq!(schema, batch_schema);
2376 let a_array = batch.column(col_a_index).as_primitive::<Int64Type>();
2377 sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
2378 }
2379 assert_eq!(12, sum_num_rows);
2380 assert_eq!(3, num_batches);
2381 assert_eq!(100000000000011, sum_a);
2382 }
2383
2384 #[test]
2385 fn test_decoder_error() {
2386 let schema = Arc::new(Schema::new(vec![Field::new_struct(
2387 "a",
2388 vec![Field::new("child", DataType::Int32, false)],
2389 true,
2390 )]));
2391
2392 let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
2393 let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
2394 assert!(decoder.tape_decoder.has_partial_row());
2395 assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2396 let _ = decoder.flush().unwrap_err();
2397 assert!(decoder.tape_decoder.has_partial_row());
2398 assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2399
2400 let parse_err = |s: &str| {
2401 ReaderBuilder::new(schema.clone())
2402 .build(Cursor::new(s.as_bytes()))
2403 .unwrap()
2404 .next()
2405 .unwrap()
2406 .unwrap_err()
2407 .to_string()
2408 };
2409
2410 let err = parse_err(r#"{"a": 123}"#);
2411 assert_eq!(
2412 err,
2413 "Json error: whilst decoding field 'a': expected { got 123"
2414 );
2415
2416 let err = parse_err(r#"{"a": ["bar"]}"#);
2417 assert_eq!(
2418 err,
2419 r#"Json error: whilst decoding field 'a': expected { got ["bar"]"#
2420 );
2421
2422 let err = parse_err(r#"{"a": []}"#);
2423 assert_eq!(
2424 err,
2425 "Json error: whilst decoding field 'a': expected { got []"
2426 );
2427
2428 let err = parse_err(r#"{"a": [{"child": 234}]}"#);
2429 assert_eq!(
2430 err,
2431 r#"Json error: whilst decoding field 'a': expected { got [{"child": 234}]"#
2432 );
2433
2434 let err = parse_err(r#"{"a": [{"child": {"foo": [{"foo": ["bar"]}]}}]}"#);
2435 assert_eq!(
2436 err,
2437 r#"Json error: whilst decoding field 'a': expected { got [{"child": {"foo": [{"foo": ["bar"]}]}}]"#
2438 );
2439
2440 let err = parse_err(r#"{"a": true}"#);
2441 assert_eq!(
2442 err,
2443 "Json error: whilst decoding field 'a': expected { got true"
2444 );
2445
2446 let err = parse_err(r#"{"a": false}"#);
2447 assert_eq!(
2448 err,
2449 "Json error: whilst decoding field 'a': expected { got false"
2450 );
2451
2452 let err = parse_err(r#"{"a": "foo"}"#);
2453 assert_eq!(
2454 err,
2455 "Json error: whilst decoding field 'a': expected { got \"foo\""
2456 );
2457
2458 let err = parse_err(r#"{"a": {"child": false}}"#);
2459 assert_eq!(
2460 err,
2461 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got false"
2462 );
2463
2464 let err = parse_err(r#"{"a": {"child": []}}"#);
2465 assert_eq!(
2466 err,
2467 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got []"
2468 );
2469
2470 let err = parse_err(r#"{"a": {"child": [123]}}"#);
2471 assert_eq!(
2472 err,
2473 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123]"
2474 );
2475
2476 let err = parse_err(r#"{"a": {"child": [123, 3465346]}}"#);
2477 assert_eq!(
2478 err,
2479 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123, 3465346]"
2480 );
2481 }
2482
2483 #[test]
2484 fn test_serialize_timestamp() {
2485 let json = vec![
2486 json!({"timestamp": 1681319393}),
2487 json!({"timestamp": "1970-01-01T00:00:00+02:00"}),
2488 ];
2489 let schema = Schema::new(vec![Field::new(
2490 "timestamp",
2491 DataType::Timestamp(TimeUnit::Second, None),
2492 true,
2493 )]);
2494 let mut decoder = ReaderBuilder::new(Arc::new(schema))
2495 .build_decoder()
2496 .unwrap();
2497 decoder.serialize(&json).unwrap();
2498 let batch = decoder.flush().unwrap().unwrap();
2499 assert_eq!(batch.num_rows(), 2);
2500 assert_eq!(batch.num_columns(), 1);
2501 let values = batch.column(0).as_primitive::<TimestampSecondType>();
2502 assert_eq!(values.values(), &[1681319393, -7200]);
2503 }
2504
2505 #[test]
2506 fn test_serialize_decimal() {
2507 let json = vec![
2508 json!({"decimal": 1.234}),
2509 json!({"decimal": "1.234"}),
2510 json!({"decimal": 1234}),
2511 json!({"decimal": "1234"}),
2512 ];
2513 let schema = Schema::new(vec![Field::new(
2514 "decimal",
2515 DataType::Decimal128(10, 3),
2516 true,
2517 )]);
2518 let mut decoder = ReaderBuilder::new(Arc::new(schema))
2519 .build_decoder()
2520 .unwrap();
2521 decoder.serialize(&json).unwrap();
2522 let batch = decoder.flush().unwrap().unwrap();
2523 assert_eq!(batch.num_rows(), 4);
2524 assert_eq!(batch.num_columns(), 1);
2525 let values = batch.column(0).as_primitive::<Decimal128Type>();
2526 assert_eq!(values.values(), &[1234, 1234, 1234000, 1234000]);
2527 }
2528
2529 #[test]
2530 fn test_serde_field() {
2531 let field = Field::new("int", DataType::Int32, true);
2532 let mut decoder = ReaderBuilder::new_with_field(field)
2533 .build_decoder()
2534 .unwrap();
2535 decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
2536 let b = decoder.flush().unwrap().unwrap();
2537 let values = b.column(0).as_primitive::<Int32Type>().values();
2538 assert_eq!(values, &[1, 2, 3, 4]);
2539 }
2540
2541 #[test]
2542 fn test_serde_large_numbers() {
2543 let field = Field::new("int", DataType::Int64, true);
2544 let mut decoder = ReaderBuilder::new_with_field(field)
2545 .build_decoder()
2546 .unwrap();
2547
2548 decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2549 let b = decoder.flush().unwrap().unwrap();
2550 let values = b.column(0).as_primitive::<Int64Type>().values();
2551 assert_eq!(values, &[1699148028689, 2, 3, 4]);
2552
2553 let field = Field::new(
2554 "int",
2555 DataType::Timestamp(TimeUnit::Microsecond, None),
2556 true,
2557 );
2558 let mut decoder = ReaderBuilder::new_with_field(field)
2559 .build_decoder()
2560 .unwrap();
2561
2562 decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2563 let b = decoder.flush().unwrap().unwrap();
2564 let values = b
2565 .column(0)
2566 .as_primitive::<TimestampMicrosecondType>()
2567 .values();
2568 assert_eq!(values, &[1699148028689, 2, 3, 4]);
2569 }
2570
2571 #[test]
2572 fn test_coercing_primitive_into_string_decoder() {
2573 let buf = &format!(
2574 r#"[{{"a": 1, "b": "A", "c": "T"}}, {{"a": 2, "b": "BB", "c": "F"}}, {{"a": {}, "b": 123, "c": false}}, {{"a": {}, "b": 789, "c": true}}]"#,
2575 (i32::MAX as i64 + 10),
2576 i64::MAX - 10
2577 );
2578 let schema = Schema::new(vec![
2579 Field::new("a", DataType::Float64, true),
2580 Field::new("b", DataType::Utf8, true),
2581 Field::new("c", DataType::Utf8, true),
2582 ]);
2583 let json_array: Vec<serde_json::Value> = serde_json::from_str(buf).unwrap();
2584 let schema_ref = Arc::new(schema);
2585
2586 let reader = ReaderBuilder::new(schema_ref.clone()).with_coerce_primitive(true);
2588 let mut decoder = reader.build_decoder().unwrap();
2589 decoder.serialize(json_array.as_slice()).unwrap();
2590 let batch = decoder.flush().unwrap().unwrap();
2591 assert_eq!(
2592 batch,
2593 RecordBatch::try_new(
2594 schema_ref,
2595 vec![
2596 Arc::new(Float64Array::from(vec![
2597 1.0,
2598 2.0,
2599 (i32::MAX as i64 + 10) as f64,
2600 (i64::MAX - 10) as f64
2601 ])),
2602 Arc::new(StringArray::from(vec!["A", "BB", "123", "789"])),
2603 Arc::new(StringArray::from(vec!["T", "F", "false", "true"])),
2604 ]
2605 )
2606 .unwrap()
2607 );
2608 }
2609
2610 fn _parse_structs(
2615 row: &str,
2616 struct_mode: StructMode,
2617 fields: Fields,
2618 as_struct: bool,
2619 ) -> Result<RecordBatch, ArrowError> {
2620 let builder = if as_struct {
2621 ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
2622 } else {
2623 ReaderBuilder::new(Arc::new(Schema::new(fields)))
2624 };
2625 builder
2626 .with_struct_mode(struct_mode)
2627 .build(Cursor::new(row.as_bytes()))
2628 .unwrap()
2629 .next()
2630 .unwrap()
2631 }
2632
2633 #[test]
2634 fn test_struct_decoding_list_length() {
2635 use arrow_array::array;
2636
2637 let row = "[1, 2]";
2638
2639 let mut fields = vec![Field::new("a", DataType::Int32, true)];
2640 let too_few_fields = Fields::from(fields.clone());
2641 fields.push(Field::new("b", DataType::Int32, true));
2642 let correct_fields = Fields::from(fields.clone());
2643 fields.push(Field::new("c", DataType::Int32, true));
2644 let too_many_fields = Fields::from(fields.clone());
2645
2646 let parse = |fields: Fields, as_struct: bool| {
2647 _parse_structs(row, StructMode::ListOnly, fields, as_struct)
2648 };
2649
2650 let expected_row = StructArray::new(
2651 correct_fields.clone(),
2652 vec![
2653 Arc::new(array::Int32Array::from(vec![1])),
2654 Arc::new(array::Int32Array::from(vec![2])),
2655 ],
2656 None,
2657 );
2658 let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);
2659
2660 assert_eq!(
2661 parse(too_few_fields.clone(), true).unwrap_err().to_string(),
2662 "Json error: found extra columns for 1 fields".to_string()
2663 );
2664 assert_eq!(
2665 parse(too_few_fields, false).unwrap_err().to_string(),
2666 "Json error: found extra columns for 1 fields".to_string()
2667 );
2668 assert_eq!(
2669 parse(correct_fields.clone(), true).unwrap(),
2670 RecordBatch::try_new(
2671 Arc::new(Schema::new(vec![row_field])),
2672 vec![Arc::new(expected_row.clone())]
2673 )
2674 .unwrap()
2675 );
2676 assert_eq!(
2677 parse(correct_fields, false).unwrap(),
2678 RecordBatch::from(expected_row)
2679 );
2680 assert_eq!(
2681 parse(too_many_fields.clone(), true)
2682 .unwrap_err()
2683 .to_string(),
2684 "Json error: found 2 columns for 3 fields".to_string()
2685 );
2686 assert_eq!(
2687 parse(too_many_fields, false).unwrap_err().to_string(),
2688 "Json error: found 2 columns for 3 fields".to_string()
2689 );
2690 }
2691
2692 #[test]
2693 fn test_struct_decoding() {
2694 use arrow_array::builder;
2695
2696 let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
2697 let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
2698 let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;
2699
2700 let struct_fields = Fields::from(vec![
2701 Field::new("b", DataType::new_list(DataType::Int32, true), true),
2702 Field::new_map(
2703 "c",
2704 "entries",
2705 Field::new("keys", DataType::Utf8, false),
2706 Field::new("values", DataType::Int32, true),
2707 false,
2708 false,
2709 ),
2710 ]);
2711
2712 let list_array =
2713 ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);
2714
2715 let map_array = {
2716 let mut map_builder = builder::MapBuilder::new(
2717 None,
2718 builder::StringBuilder::new(),
2719 builder::Int32Builder::new(),
2720 );
2721 map_builder.keys().append_value("d");
2722 map_builder.values().append_value(3);
2723 map_builder.append(true).unwrap();
2724 map_builder.finish()
2725 };
2726
2727 let struct_array = StructArray::new(
2728 struct_fields.clone(),
2729 vec![Arc::new(list_array), Arc::new(map_array)],
2730 None,
2731 );
2732
2733 let fields = Fields::from(vec![Field::new("a", DataType::Struct(struct_fields), true)]);
2734 let schema = Arc::new(Schema::new(fields.clone()));
2735 let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();
2736
2737 let parse = |row: &str, struct_mode: StructMode| {
2738 _parse_structs(row, struct_mode, fields.clone(), false)
2739 };
2740
2741 assert_eq!(
2742 parse(nested_object_json, StructMode::ObjectOnly).unwrap(),
2743 expected
2744 );
2745 assert_eq!(
2746 parse(nested_list_json, StructMode::ObjectOnly)
2747 .unwrap_err()
2748 .to_string(),
2749 "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
2750 );
2751 assert_eq!(
2752 parse(nested_mixed_json, StructMode::ObjectOnly)
2753 .unwrap_err()
2754 .to_string(),
2755 "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
2756 );
2757
2758 assert_eq!(
2759 parse(nested_list_json, StructMode::ListOnly).unwrap(),
2760 expected
2761 );
2762 assert_eq!(
2763 parse(nested_object_json, StructMode::ListOnly)
2764 .unwrap_err()
2765 .to_string(),
2766 "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
2767 );
2768 assert_eq!(
2769 parse(nested_mixed_json, StructMode::ListOnly)
2770 .unwrap_err()
2771 .to_string(),
2772 "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
2773 );
2774 }
2775
2776 #[test]
2782 fn test_struct_decoding_empty_list() {
2783 let int_field = Field::new("a", DataType::Int32, true);
2784 let struct_field = Field::new(
2785 "r",
2786 DataType::Struct(Fields::from(vec![int_field.clone()])),
2787 true,
2788 );
2789
2790 let parse = |row: &str, as_struct: bool, field: Field| {
2791 _parse_structs(
2792 row,
2793 StructMode::ListOnly,
2794 Fields::from(vec![field]),
2795 as_struct,
2796 )
2797 };
2798
2799 assert_eq!(
2801 parse("[]", true, struct_field.clone())
2802 .unwrap_err()
2803 .to_string(),
2804 "Json error: found 0 columns for 1 fields".to_owned()
2805 );
2806 assert_eq!(
2807 parse("[]", false, int_field.clone())
2808 .unwrap_err()
2809 .to_string(),
2810 "Json error: found 0 columns for 1 fields".to_owned()
2811 );
2812 assert_eq!(
2813 parse("[]", false, struct_field.clone())
2814 .unwrap_err()
2815 .to_string(),
2816 "Json error: found 0 columns for 1 fields".to_owned()
2817 );
2818 assert_eq!(
2819 parse("[[]]", false, struct_field.clone())
2820 .unwrap_err()
2821 .to_string(),
2822 "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
2823 );
2824 }
2825
2826 #[test]
2827 fn test_decode_list_struct_with_wrong_types() {
2828 let int_field = Field::new("a", DataType::Int32, true);
2829 let struct_field = Field::new(
2830 "r",
2831 DataType::Struct(Fields::from(vec![int_field.clone()])),
2832 true,
2833 );
2834
2835 let parse = |row: &str, as_struct: bool, field: Field| {
2836 _parse_structs(
2837 row,
2838 StructMode::ListOnly,
2839 Fields::from(vec![field]),
2840 as_struct,
2841 )
2842 };
2843
2844 assert_eq!(
2846 parse(r#"[["a"]]"#, false, struct_field.clone())
2847 .unwrap_err()
2848 .to_string(),
2849 "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2850 );
2851 assert_eq!(
2852 parse(r#"[["a"]]"#, true, struct_field.clone())
2853 .unwrap_err()
2854 .to_string(),
2855 "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2856 );
2857 assert_eq!(
2858 parse(r#"["a"]"#, true, int_field.clone())
2859 .unwrap_err()
2860 .to_string(),
2861 "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2862 );
2863 assert_eq!(
2864 parse(r#"["a"]"#, false, int_field.clone())
2865 .unwrap_err()
2866 .to_string(),
2867 "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2868 );
2869 }
2870
2871 #[test]
2872 fn test_read_run_end_encoded() {
2873 let buf = r#"
2874 {"a": "x"}
2875 {"a": "x"}
2876 {"a": "y"}
2877 {"a": "y"}
2878 {"a": "y"}
2879 "#;
2880
2881 let ree_type = DataType::RunEndEncoded(
2882 Arc::new(Field::new("run_ends", DataType::Int32, false)),
2883 Arc::new(Field::new("values", DataType::Utf8, true)),
2884 );
2885 let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
2886 let batches = do_read(buf, 1024, false, false, schema);
2887 assert_eq!(batches.len(), 1);
2888
2889 let col = batches[0].column(0);
2890 let run_array = col.as_run::<arrow_array::types::Int32Type>();
2891
2892 assert_eq!(run_array.len(), 5);
2894 assert_eq!(run_array.run_ends().values(), &[2, 5]);
2895
2896 let values = run_array.values().as_string::<i32>();
2897 assert_eq!(values.len(), 2);
2898 assert_eq!(values.value(0), "x");
2899 assert_eq!(values.value(1), "y");
2900 }
2901
2902 #[test]
2903 fn test_read_run_end_encoded_consecutive_nulls() {
2904 let buf = r#"
2905 {"a": "x"}
2906 {}
2907 {}
2908 {}
2909 {"a": "y"}
2910 "#;
2911
2912 let ree_type = DataType::RunEndEncoded(
2913 Arc::new(Field::new("run_ends", DataType::Int32, false)),
2914 Arc::new(Field::new("values", DataType::Utf8, true)),
2915 );
2916 let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
2917 let batches = do_read(buf, 1024, false, false, schema);
2918 assert_eq!(batches.len(), 1);
2919
2920 let col = batches[0].column(0);
2921 let run_array = col.as_run::<arrow_array::types::Int32Type>();
2922
2923 assert_eq!(run_array.len(), 5);
2925 assert_eq!(run_array.run_ends().values(), &[1, 4, 5]);
2926
2927 let values = run_array.values().as_string::<i32>();
2928 assert_eq!(values.len(), 3);
2929 assert_eq!(values.value(0), "x");
2930 assert!(values.is_null(1));
2931 assert_eq!(values.value(2), "y");
2932 }
2933
2934 #[test]
2935 fn test_read_run_end_encoded_all_unique() {
2936 let buf = r#"
2937 {"a": 1}
2938 {"a": 2}
2939 {"a": 3}
2940 "#;
2941
2942 let ree_type = DataType::RunEndEncoded(
2943 Arc::new(Field::new("run_ends", DataType::Int32, false)),
2944 Arc::new(Field::new("values", DataType::Int32, true)),
2945 );
2946 let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
2947 let batches = do_read(buf, 1024, false, false, schema);
2948 assert_eq!(batches.len(), 1);
2949
2950 let col = batches[0].column(0);
2951 let run_array = col.as_run::<arrow_array::types::Int32Type>();
2952
2953 assert_eq!(run_array.len(), 3);
2955 assert_eq!(run_array.run_ends().values(), &[1, 2, 3]);
2956 }
2957
2958 #[test]
2959 fn test_read_run_end_encoded_int16_run_ends() {
2960 let buf = r#"
2961 {"a": "x"}
2962 {"a": "x"}
2963 {"a": "y"}
2964 "#;
2965
2966 let ree_type = DataType::RunEndEncoded(
2967 Arc::new(Field::new("run_ends", DataType::Int16, false)),
2968 Arc::new(Field::new("values", DataType::Utf8, true)),
2969 );
2970 let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
2971 let batches = do_read(buf, 1024, false, false, schema);
2972 assert_eq!(batches.len(), 1);
2973
2974 let col = batches[0].column(0);
2975 let run_array = col.as_run::<arrow_array::types::Int16Type>();
2976
2977 assert_eq!(run_array.len(), 3);
2978 assert_eq!(run_array.run_ends().values(), &[2i16, 3]);
2979 }
2980}