1use std::borrow::Cow;
137use std::io::BufRead;
138use std::sync::Arc;
139
140use arrow_array::cast::AsArray;
141use arrow_array::timezone::Tz;
142use arrow_array::types::*;
143use arrow_array::{ArrayRef, RecordBatch, RecordBatchReader, downcast_integer};
144use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
145use chrono::Utc;
146use serde_core::Serialize;
147
148use crate::StructMode;
149use crate::reader::binary_array::{
150 BinaryArrayDecoder, BinaryViewDecoder, FixedSizeBinaryArrayDecoder,
151};
152use crate::reader::boolean_array::BooleanArrayDecoder;
153use crate::reader::decimal_array::DecimalArrayDecoder;
154use crate::reader::list_array::{ListArrayDecoder, ListViewArrayDecoder};
155use crate::reader::map_array::MapArrayDecoder;
156use crate::reader::null_array::NullArrayDecoder;
157use crate::reader::primitive_array::PrimitiveArrayDecoder;
158use crate::reader::run_end_array::RunEndEncodedArrayDecoder;
159use crate::reader::string_array::StringArrayDecoder;
160use crate::reader::string_view_array::StringViewArrayDecoder;
161use crate::reader::struct_array::StructArrayDecoder;
162use crate::reader::tape::{Tape, TapeDecoder};
163use crate::reader::timestamp_array::TimestampArrayDecoder;
164
165pub use schema::*;
166pub use value_iter::ValueIter;
167
168mod binary_array;
169mod boolean_array;
170mod decimal_array;
171mod list_array;
172mod map_array;
173mod null_array;
174mod primitive_array;
175mod run_end_array;
176mod schema;
177mod serializer;
178mod string_array;
179mod string_view_array;
180mod struct_array;
181mod tape;
182mod timestamp_array;
183mod value_iter;
184
185pub struct ReaderBuilder {
187 batch_size: usize,
188 coerce_primitive: bool,
189 strict_mode: bool,
190 ignore_type_conflicts: bool,
191 is_field: bool,
192 struct_mode: StructMode,
193
194 schema: SchemaRef,
195}
196
197impl ReaderBuilder {
198 pub fn new(schema: SchemaRef) -> Self {
207 Self {
208 batch_size: 1024,
209 coerce_primitive: false,
210 strict_mode: false,
211 ignore_type_conflicts: false,
212 is_field: false,
213 struct_mode: Default::default(),
214 schema,
215 }
216 }
217
218 pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
249 Self {
250 batch_size: 1024,
251 coerce_primitive: false,
252 strict_mode: false,
253 ignore_type_conflicts: false,
254 is_field: true,
255 struct_mode: Default::default(),
256 schema: Arc::new(Schema::new([field.into()])),
257 }
258 }
259
260 pub fn with_batch_size(self, batch_size: usize) -> Self {
262 Self { batch_size, ..self }
263 }
264
265 pub fn with_coerce_primitive(self, coerce_primitive: bool) -> Self {
268 Self {
269 coerce_primitive,
270 ..self
271 }
272 }
273
274 pub fn with_strict_mode(self, strict_mode: bool) -> Self {
280 Self {
281 strict_mode,
282 ..self
283 }
284 }
285
286 pub fn with_struct_mode(self, struct_mode: StructMode) -> Self {
290 Self {
291 struct_mode,
292 ..self
293 }
294 }
295
296 pub fn with_ignore_type_conflicts(self, ignore_type_conflicts: bool) -> Self {
309 Self {
310 ignore_type_conflicts,
311 ..self
312 }
313 }
314
315 pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
317 Ok(Reader {
318 reader,
319 decoder: self.build_decoder()?,
320 })
321 }
322
323 pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
325 let (data_type, nullable) = if self.is_field {
326 let field = &self.schema.fields[0];
327 let data_type = Cow::Borrowed(field.data_type());
328 (data_type, field.is_nullable())
329 } else {
330 let data_type = Cow::Owned(DataType::Struct(self.schema.fields.clone()));
331 (data_type, false)
332 };
333
334 let ctx = DecoderContext {
335 coerce_primitive: self.coerce_primitive,
336 strict_mode: self.strict_mode,
337 struct_mode: self.struct_mode,
338 ignore_type_conflicts: self.ignore_type_conflicts,
339 };
340 let decoder = ctx.make_decoder(data_type.as_ref(), nullable)?;
341
342 let num_fields = self.schema.flattened_fields().len();
343
344 Ok(Decoder {
345 decoder,
346 is_field: self.is_field,
347 tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
348 batch_size: self.batch_size,
349 schema: self.schema,
350 })
351 }
352}
353
354pub struct Reader<R> {
358 reader: R,
359 decoder: Decoder,
360}
361
362impl<R> std::fmt::Debug for Reader<R> {
363 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
364 f.debug_struct("Reader")
365 .field("decoder", &self.decoder)
366 .finish()
367 }
368}
369
370impl<R: BufRead> Reader<R> {
371 fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
373 loop {
374 let buf = self.reader.fill_buf()?;
375 if buf.is_empty() {
376 break;
377 }
378 let read = buf.len();
379
380 let decoded = self.decoder.decode(buf)?;
381 self.reader.consume(decoded);
382 if decoded != read {
383 break;
384 }
385 }
386 self.decoder.flush()
387 }
388}
389
390impl<R: BufRead> Iterator for Reader<R> {
391 type Item = Result<RecordBatch, ArrowError>;
392
393 fn next(&mut self) -> Option<Self::Item> {
394 self.read().transpose()
395 }
396}
397
398impl<R: BufRead> RecordBatchReader for Reader<R> {
399 fn schema(&self) -> SchemaRef {
400 self.decoder.schema.clone()
401 }
402}
403
404pub struct Decoder {
445 tape_decoder: TapeDecoder,
446 decoder: Box<dyn ArrayDecoder>,
447 batch_size: usize,
448 is_field: bool,
449 schema: SchemaRef,
450}
451
452impl std::fmt::Debug for Decoder {
453 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
454 f.debug_struct("Decoder")
455 .field("schema", &self.schema)
456 .field("batch_size", &self.batch_size)
457 .finish()
458 }
459}
460
461impl Decoder {
462 pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
471 self.tape_decoder.decode(buf)
472 }
473
474 pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
651 self.tape_decoder.serialize(rows)
652 }
653
654 pub fn has_partial_record(&self) -> bool {
656 self.tape_decoder.has_partial_row()
657 }
658
659 pub fn len(&self) -> usize {
661 self.tape_decoder.num_buffered_rows()
662 }
663
664 pub fn is_empty(&self) -> bool {
666 self.len() == 0
667 }
668
669 pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
676 let tape = self.tape_decoder.finish()?;
677
678 if tape.num_rows() == 0 {
679 return Ok(None);
680 }
681
682 let mut next_object = 1;
684 let pos: Vec<_> = (0..tape.num_rows())
685 .map(|_| {
686 let next = tape.next(next_object, "row").unwrap();
687 std::mem::replace(&mut next_object, next)
688 })
689 .collect();
690
691 let decoded = self.decoder.decode(&tape, &pos)?;
692 self.tape_decoder.clear();
693
694 let batch = match self.is_field {
695 true => RecordBatch::try_new(self.schema.clone(), vec![decoded])?,
696 false => {
697 RecordBatch::from(decoded.as_struct().clone()).with_schema(self.schema.clone())?
698 }
699 };
700
701 Ok(Some(batch))
702 }
703}
704
705trait ArrayDecoder: Send {
706 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError>;
708}
709
710pub struct DecoderContext {
715 coerce_primitive: bool,
717 strict_mode: bool,
719 struct_mode: StructMode,
721 ignore_type_conflicts: bool,
723}
724
725impl DecoderContext {
726 pub fn coerce_primitive(&self) -> bool {
728 self.coerce_primitive
729 }
730
731 pub fn strict_mode(&self) -> bool {
733 self.strict_mode
734 }
735
736 pub fn struct_mode(&self) -> StructMode {
738 self.struct_mode
739 }
740
741 pub fn ignore_type_conflicts(&self) -> bool {
743 self.ignore_type_conflicts
744 }
745
746 fn make_decoder(
751 &self,
752 data_type: &DataType,
753 is_nullable: bool,
754 ) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
755 make_decoder(self, data_type, is_nullable)
756 }
757}
758
759fn make_decoder(
760 ctx: &DecoderContext,
761 data_type: &DataType,
762 is_nullable: bool,
763) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
764 macro_rules! primitive_decoder {
765 ($t:ty, $data_type:expr) => {
766 Ok(Box::new(PrimitiveArrayDecoder::<$t>::new(ctx, $data_type)))
767 };
768 }
769 macro_rules! timestamp_decoder {
770 ($t:ty, $data_type:expr, $tz:expr) => {{
771 Ok(Box::new(TimestampArrayDecoder::<$t, _>::new(
772 ctx, $data_type, $tz,
773 )))
774 }};
775 }
776 macro_rules! decimal_decoder {
777 ($t:ty, $p:expr, $s:expr) => {
778 Ok(Box::new(DecimalArrayDecoder::<$t>::new(ctx, $p, $s)))
779 };
780 }
781
782 downcast_integer! {
783 *data_type => (primitive_decoder, data_type),
784 DataType::Null => Ok(Box::new(NullArrayDecoder::new(ctx))),
785 DataType::Float16 => primitive_decoder!(Float16Type, data_type),
786 DataType::Float32 => primitive_decoder!(Float32Type, data_type),
787 DataType::Float64 => primitive_decoder!(Float64Type, data_type),
788 DataType::Timestamp(TimeUnit::Second, None) => {
789 timestamp_decoder!(TimestampSecondType, data_type, Utc)
790 },
791 DataType::Timestamp(TimeUnit::Millisecond, None) => {
792 timestamp_decoder!(TimestampMillisecondType, data_type, Utc)
793 },
794 DataType::Timestamp(TimeUnit::Microsecond, None) => {
795 timestamp_decoder!(TimestampMicrosecondType, data_type, Utc)
796 },
797 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
798 timestamp_decoder!(TimestampNanosecondType, data_type, Utc)
799 },
800 DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
801 let tz: Tz = tz.parse()?;
802 timestamp_decoder!(TimestampSecondType, data_type, tz)
803 },
804 DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
805 let tz: Tz = tz.parse()?;
806 timestamp_decoder!(TimestampMillisecondType, data_type, tz)
807 },
808 DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
809 let tz: Tz = tz.parse()?;
810 timestamp_decoder!(TimestampMicrosecondType, data_type, tz)
811 },
812 DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
813 let tz: Tz = tz.parse()?;
814 timestamp_decoder!(TimestampNanosecondType, data_type, tz)
815 },
816 DataType::Date32 => primitive_decoder!(Date32Type, data_type),
817 DataType::Date64 => primitive_decoder!(Date64Type, data_type),
818 DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
819 DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type),
820 DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type),
821 DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type),
822 DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type),
823 DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type),
824 DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type),
825 DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type),
826 DataType::Decimal32(p, s) => decimal_decoder!(Decimal32Type, p, s),
827 DataType::Decimal64(p, s) => decimal_decoder!(Decimal64Type, p, s),
828 DataType::Decimal128(p, s) => decimal_decoder!(Decimal128Type, p, s),
829 DataType::Decimal256(p, s) => decimal_decoder!(Decimal256Type, p, s),
830 DataType::Boolean => Ok(Box::new(BooleanArrayDecoder::new(ctx))),
831 DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(ctx))),
832 DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(ctx))),
833 DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(ctx))),
834 DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
835 DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
836 DataType::ListView(_) => Ok(Box::new(ListViewArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
837 DataType::LargeListView(_) => Ok(Box::new(ListViewArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
838 DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(ctx, data_type, is_nullable)?)),
839 DataType::Binary => Ok(Box::new(BinaryArrayDecoder::<i32>::default())),
840 DataType::LargeBinary => Ok(Box::new(BinaryArrayDecoder::<i64>::default())),
841 DataType::FixedSizeBinary(len) => Ok(Box::new(FixedSizeBinaryArrayDecoder::new(len))),
842 DataType::BinaryView => Ok(Box::new(BinaryViewDecoder::default())),
843 DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(ctx, data_type, is_nullable)?)),
844 DataType::RunEndEncoded(ref r, _) => match r.data_type() {
845 DataType::Int16 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int16Type>::new(ctx, data_type, is_nullable)?)),
846 DataType::Int32 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int32Type>::new(ctx, data_type, is_nullable)?)),
847 DataType::Int64 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int64Type>::new(ctx, data_type, is_nullable)?)),
848 d => unreachable!("unsupported run end index type: {d}"),
849 },
850 _ => Err(ArrowError::NotYetImplemented(format!("Support for {data_type} in JSON reader")))
851 }
852}
853
854#[cfg(test)]
855mod tests {
856 use arrow_array::cast::AsArray;
857 use arrow_array::{
858 Array, BooleanArray, Float64Array, GenericListViewArray, Int32Array, ListArray, MapArray,
859 NullArray, OffsetSizeTrait, StringArray, StringViewArray, StructArray, make_array,
860 };
861 use arrow_buffer::{ArrowNativeType, Buffer, NullBuffer};
862 use arrow_cast::display::{ArrayFormatter, FormatOptions};
863 use arrow_data::ArrayDataBuilder;
864 use arrow_schema::{Field, Fields};
865 use serde_json::json;
866 use std::fs::File;
867 use std::io::{BufReader, Cursor, Seek};
868
869 use super::*;
870
871 fn do_read(
872 buf: &str,
873 batch_size: usize,
874 coerce_primitive: bool,
875 strict_mode: bool,
876 schema: SchemaRef,
877 ) -> Vec<RecordBatch> {
878 let mut unbuffered = vec![];
879
880 for batch_size in [1, 3, 100, batch_size] {
882 unbuffered = ReaderBuilder::new(schema.clone())
883 .with_batch_size(batch_size)
884 .with_coerce_primitive(coerce_primitive)
885 .build(Cursor::new(buf.as_bytes()))
886 .unwrap()
887 .collect::<Result<Vec<_>, _>>()
888 .unwrap();
889
890 for b in unbuffered.iter().take(unbuffered.len() - 1) {
891 assert_eq!(b.num_rows(), batch_size)
892 }
893
894 for b in [1, 3, 5] {
896 let buffered = ReaderBuilder::new(schema.clone())
897 .with_batch_size(batch_size)
898 .with_coerce_primitive(coerce_primitive)
899 .with_strict_mode(strict_mode)
900 .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
901 .unwrap()
902 .collect::<Result<Vec<_>, _>>()
903 .unwrap();
904 assert_eq!(unbuffered, buffered);
905 }
906 }
907
908 unbuffered
909 }
910
911 #[test]
912 fn test_basic() {
913 let buf = r#"
914 {"a": 1, "b": 2, "c": true, "d": 1}
915 {"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254}
916
917 {"b": 6, "a": 2.0, "d": 45}
918 {"b": "5", "a": 2}
919 {"b": 4e0}
920 {"b": 7, "a": null}
921 "#;
922
923 let schema = Arc::new(Schema::new(vec![
924 Field::new("a", DataType::Int64, true),
925 Field::new("b", DataType::Int32, true),
926 Field::new("c", DataType::Boolean, true),
927 Field::new("d", DataType::Date32, true),
928 Field::new("e", DataType::Date64, true),
929 ]));
930
931 let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
932 assert!(decoder.is_empty());
933 assert_eq!(decoder.len(), 0);
934 assert!(!decoder.has_partial_record());
935 assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
936 assert!(!decoder.is_empty());
937 assert_eq!(decoder.len(), 6);
938 assert!(!decoder.has_partial_record());
939 let batch = decoder.flush().unwrap().unwrap();
940 assert_eq!(batch.num_rows(), 6);
941 assert!(decoder.is_empty());
942 assert_eq!(decoder.len(), 0);
943 assert!(!decoder.has_partial_record());
944
945 let batches = do_read(buf, 1024, false, false, schema);
946 assert_eq!(batches.len(), 1);
947
948 let col1 = batches[0].column(0).as_primitive::<Int64Type>();
949 assert_eq!(col1.null_count(), 2);
950 assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
951 assert!(col1.is_null(4));
952 assert!(col1.is_null(5));
953
954 let col2 = batches[0].column(1).as_primitive::<Int32Type>();
955 assert_eq!(col2.null_count(), 0);
956 assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
957
958 let col3 = batches[0].column(2).as_boolean();
959 assert_eq!(col3.null_count(), 4);
960 assert!(col3.value(0));
961 assert!(!col3.is_null(0));
962 assert!(!col3.value(1));
963 assert!(!col3.is_null(1));
964
965 let col4 = batches[0].column(3).as_primitive::<Date32Type>();
966 assert_eq!(col4.null_count(), 3);
967 assert!(col4.is_null(3));
968 assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]);
969
970 let col5 = batches[0].column(4).as_primitive::<Date64Type>();
971 assert_eq!(col5.null_count(), 5);
972 assert!(col5.is_null(0));
973 assert!(col5.is_null(2));
974 assert!(col5.is_null(3));
975 assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]);
976 }
977
978 #[test]
979 fn test_string() {
980 let buf = r#"
981 {"a": "1", "b": "2"}
982 {"a": "hello", "b": "shoo"}
983 {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
984
985 {"b": null}
986 {"b": "", "a": null}
987
988 "#;
989 let schema = Arc::new(Schema::new(vec![
990 Field::new("a", DataType::Utf8, true),
991 Field::new("b", DataType::LargeUtf8, true),
992 ]));
993
994 let batches = do_read(buf, 1024, false, false, schema);
995 assert_eq!(batches.len(), 1);
996
997 let col1 = batches[0].column(0).as_string::<i32>();
998 assert_eq!(col1.null_count(), 2);
999 assert_eq!(col1.value(0), "1");
1000 assert_eq!(col1.value(1), "hello");
1001 assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1002 assert!(col1.is_null(3));
1003 assert!(col1.is_null(4));
1004
1005 let col2 = batches[0].column(1).as_string::<i64>();
1006 assert_eq!(col2.null_count(), 1);
1007 assert_eq!(col2.value(0), "2");
1008 assert_eq!(col2.value(1), "shoo");
1009 assert_eq!(col2.value(2), "\t😁foo");
1010 assert!(col2.is_null(3));
1011 assert_eq!(col2.value(4), "");
1012 }
1013
1014 #[test]
1015 fn test_long_string_view_allocation() {
1016 let expected_capacity: usize = 41;
1026
1027 let buf = r#"
1028 {"a": "short", "b": "dummy"}
1029 {"a": "this is definitely long", "b": "dummy"}
1030 {"a": "hello", "b": "dummy"}
1031 {"a": "\nfoobar😀asfgÿ", "b": "dummy"}
1032 "#;
1033
1034 let schema = Arc::new(Schema::new(vec![
1035 Field::new("a", DataType::Utf8View, true),
1036 Field::new("b", DataType::LargeUtf8, true),
1037 ]));
1038
1039 let batches = do_read(buf, 1024, false, false, schema);
1040 assert_eq!(batches.len(), 1, "Expected one record batch");
1041
1042 let col_a = batches[0].column(0);
1044 let string_view_array = col_a
1045 .as_any()
1046 .downcast_ref::<StringViewArray>()
1047 .expect("Column should be a StringViewArray");
1048
1049 let data_buffer = string_view_array.to_data().buffers()[0].len();
1052
1053 assert!(
1056 data_buffer >= expected_capacity,
1057 "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1058 );
1059
1060 assert_eq!(string_view_array.value(0), "short");
1062 assert_eq!(string_view_array.value(1), "this is definitely long");
1063 assert_eq!(string_view_array.value(2), "hello");
1064 assert_eq!(string_view_array.value(3), "\nfoobar😀asfgÿ");
1065 }
1066
1067 #[test]
1069 fn test_numeric_view_allocation() {
1070 let expected_capacity: usize = 33;
1078
1079 let buf = r#"
1080 {"n": 123456789}
1081 {"n": 1000000000000}
1082 {"n": 3.1415}
1083 {"n": 2.718281828459045}
1084 "#;
1085
1086 let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Utf8View, true)]));
1087
1088 let batches = do_read(buf, 1024, true, false, schema);
1089 assert_eq!(batches.len(), 1, "Expected one record batch");
1090
1091 let col_n = batches[0].column(0);
1092 let string_view_array = col_n
1093 .as_any()
1094 .downcast_ref::<StringViewArray>()
1095 .expect("Column should be a StringViewArray");
1096
1097 let data_buffer = string_view_array.to_data().buffers()[0].len();
1099 assert!(
1100 data_buffer >= expected_capacity,
1101 "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1102 );
1103
1104 assert_eq!(string_view_array.value(0), "123456789");
1107 assert_eq!(string_view_array.value(1), "1000000000000");
1108 assert_eq!(string_view_array.value(2), "3.1415");
1109 assert_eq!(string_view_array.value(3), "2.718281828459045");
1110 }
1111
1112 #[test]
1113 fn test_string_with_uft8view() {
1114 let buf = r#"
1115 {"a": "1", "b": "2"}
1116 {"a": "hello", "b": "shoo"}
1117 {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
1118
1119 {"b": null}
1120 {"b": "", "a": null}
1121
1122 "#;
1123 let schema = Arc::new(Schema::new(vec![
1124 Field::new("a", DataType::Utf8View, true),
1125 Field::new("b", DataType::LargeUtf8, true),
1126 ]));
1127
1128 let batches = do_read(buf, 1024, false, false, schema);
1129 assert_eq!(batches.len(), 1);
1130
1131 let col1 = batches[0].column(0).as_string_view();
1132 assert_eq!(col1.null_count(), 2);
1133 assert_eq!(col1.value(0), "1");
1134 assert_eq!(col1.value(1), "hello");
1135 assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1136 assert!(col1.is_null(3));
1137 assert!(col1.is_null(4));
1138 assert_eq!(col1.data_type(), &DataType::Utf8View);
1139
1140 let col2 = batches[0].column(1).as_string::<i64>();
1141 assert_eq!(col2.null_count(), 1);
1142 assert_eq!(col2.value(0), "2");
1143 assert_eq!(col2.value(1), "shoo");
1144 assert_eq!(col2.value(2), "\t😁foo");
1145 assert!(col2.is_null(3));
1146 assert_eq!(col2.value(4), "");
1147 }
1148
1149 #[test]
1150 fn test_complex() {
1151 let buf = r#"
1152 {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
1153 {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1154 {"list": null, "nested": {"a": null}}
1155 "#;
1156
1157 let schema = Arc::new(Schema::new(vec![
1158 Field::new_list("list", Field::new("element", DataType::Int32, false), true),
1159 Field::new_struct(
1160 "nested",
1161 vec![
1162 Field::new("a", DataType::Int32, true),
1163 Field::new("b", DataType::Int32, true),
1164 ],
1165 true,
1166 ),
1167 Field::new_struct(
1168 "nested_list",
1169 vec![Field::new_list(
1170 "list2",
1171 Field::new_struct(
1172 "element",
1173 vec![Field::new("c", DataType::Int32, false)],
1174 false,
1175 ),
1176 true,
1177 )],
1178 true,
1179 ),
1180 ]));
1181
1182 let batches = do_read(buf, 1024, false, false, schema);
1183 assert_eq!(batches.len(), 1);
1184
1185 let list = batches[0].column(0).as_list::<i32>();
1186 assert_eq!(list.len(), 3);
1187 assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
1188 assert_eq!(list.null_count(), 1);
1189 assert!(list.is_null(2));
1190 let list_values = list.values().as_primitive::<Int32Type>();
1191 assert_eq!(list_values.values(), &[5, 6]);
1192
1193 let nested = batches[0].column(1).as_struct();
1194 let a = nested.column(0).as_primitive::<Int32Type>();
1195 assert_eq!(list.null_count(), 1);
1196 assert_eq!(a.values(), &[1, 7, 0]);
1197 assert!(list.is_null(2));
1198
1199 let b = nested.column(1).as_primitive::<Int32Type>();
1200 assert_eq!(b.null_count(), 2);
1201 assert_eq!(b.len(), 3);
1202 assert_eq!(b.value(0), 2);
1203 assert!(b.is_null(1));
1204 assert!(b.is_null(2));
1205
1206 let nested_list = batches[0].column(2).as_struct();
1207 assert_eq!(nested_list.len(), 3);
1208 assert_eq!(nested_list.null_count(), 1);
1209 assert!(nested_list.is_null(2));
1210
1211 let list2 = nested_list.column(0).as_list::<i32>();
1212 assert_eq!(list2.len(), 3);
1213 assert_eq!(list2.null_count(), 1);
1214 assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
1215 assert!(list2.is_null(2));
1216
1217 let list2_values = list2.values().as_struct();
1218
1219 let c = list2_values.column(0).as_primitive::<Int32Type>();
1220 assert_eq!(c.values(), &[3, 4]);
1221 }
1222
1223 #[test]
1224 fn test_projection() {
1225 let buf = r#"
1226 {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
1227 {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1228 "#;
1229
1230 let schema = Arc::new(Schema::new(vec![
1231 Field::new_struct(
1232 "nested",
1233 vec![Field::new("a", DataType::Int32, false)],
1234 true,
1235 ),
1236 Field::new_struct(
1237 "nested_list",
1238 vec![Field::new_list(
1239 "list2",
1240 Field::new_struct(
1241 "element",
1242 vec![Field::new("d", DataType::Int32, true)],
1243 false,
1244 ),
1245 true,
1246 )],
1247 true,
1248 ),
1249 ]));
1250
1251 let batches = do_read(buf, 1024, false, false, schema);
1252 assert_eq!(batches.len(), 1);
1253
1254 let nested = batches[0].column(0).as_struct();
1255 assert_eq!(nested.num_columns(), 1);
1256 let a = nested.column(0).as_primitive::<Int32Type>();
1257 assert_eq!(a.null_count(), 0);
1258 assert_eq!(a.values(), &[1, 7]);
1259
1260 let nested_list = batches[0].column(1).as_struct();
1261 assert_eq!(nested_list.num_columns(), 1);
1262 assert_eq!(nested_list.null_count(), 0);
1263
1264 let list2 = nested_list.column(0).as_list::<i32>();
1265 assert_eq!(list2.value_offsets(), &[0, 2, 2]);
1266 assert_eq!(list2.null_count(), 0);
1267
1268 let child = list2.values().as_struct();
1269 assert_eq!(child.num_columns(), 1);
1270 assert_eq!(child.len(), 2);
1271 assert_eq!(child.null_count(), 0);
1272
1273 let c = child.column(0).as_primitive::<Int32Type>();
1274 assert_eq!(c.values(), &[5, 0]);
1275 assert_eq!(c.null_count(), 1);
1276 assert!(c.is_null(1));
1277 }
1278
1279 #[test]
1280 fn test_map() {
1281 let buf = r#"
1282 {"map": {"a": ["foo", null]}}
1283 {"map": {"a": [null], "b": []}}
1284 {"map": {"c": null, "a": ["baz"]}}
1285 "#;
1286 let map = Field::new_map(
1287 "map",
1288 "entries",
1289 Field::new("key", DataType::Utf8, false),
1290 Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
1291 false,
1292 true,
1293 );
1294
1295 let schema = Arc::new(Schema::new(vec![map]));
1296
1297 let batches = do_read(buf, 1024, false, false, schema);
1298 assert_eq!(batches.len(), 1);
1299
1300 let map = batches[0].column(0).as_map();
1301 let map_keys = map.keys().as_string::<i32>();
1302 let map_values = map.values().as_list::<i32>();
1303 assert_eq!(map.value_offsets(), &[0, 1, 3, 5]);
1304
1305 let k: Vec<_> = map_keys.iter().flatten().collect();
1306 assert_eq!(&k, &["a", "a", "b", "c", "a"]);
1307
1308 let list_values = map_values.values().as_string::<i32>();
1309 let lv: Vec<_> = list_values.iter().collect();
1310 assert_eq!(&lv, &[Some("foo"), None, None, Some("baz")]);
1311 assert_eq!(map_values.value_offsets(), &[0, 2, 3, 3, 3, 4]);
1312 assert_eq!(map_values.null_count(), 1);
1313 assert!(map_values.is_null(3));
1314
1315 let options = FormatOptions::default().with_null("null");
1316 let formatter = ArrayFormatter::try_new(map, &options).unwrap();
1317 assert_eq!(formatter.value(0).to_string(), "{a: [foo, null]}");
1318 assert_eq!(formatter.value(1).to_string(), "{a: [null], b: []}");
1319 assert_eq!(formatter.value(2).to_string(), "{c: null, a: [baz]}");
1320 }
1321
1322 #[test]
1323 fn test_not_coercing_primitive_into_string_without_flag() {
1324 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
1325
1326 let buf = r#"{"a": 1}"#;
1327 let err = ReaderBuilder::new(schema.clone())
1328 .with_batch_size(1024)
1329 .build(Cursor::new(buf.as_bytes()))
1330 .unwrap()
1331 .read()
1332 .unwrap_err();
1333
1334 assert_eq!(
1335 err.to_string(),
1336 "Json error: whilst decoding field 'a': expected string got 1"
1337 );
1338
1339 let buf = r#"{"a": true}"#;
1340 let err = ReaderBuilder::new(schema)
1341 .with_batch_size(1024)
1342 .build(Cursor::new(buf.as_bytes()))
1343 .unwrap()
1344 .read()
1345 .unwrap_err();
1346
1347 assert_eq!(
1348 err.to_string(),
1349 "Json error: whilst decoding field 'a': expected string got true"
1350 );
1351 }
1352
1353 #[test]
1354 fn test_coercing_primitive_into_string() {
1355 let buf = r#"
1356 {"a": 1, "b": 2, "c": true}
1357 {"a": 2E0, "b": 4, "c": false}
1358
1359 {"b": 6, "a": 2.0}
1360 {"b": "5", "a": 2}
1361 {"b": 4e0}
1362 {"b": 7, "a": null}
1363 "#;
1364
1365 let schema = Arc::new(Schema::new(vec![
1366 Field::new("a", DataType::Utf8, true),
1367 Field::new("b", DataType::Utf8, true),
1368 Field::new("c", DataType::Utf8, true),
1369 ]));
1370
1371 let batches = do_read(buf, 1024, true, false, schema);
1372 assert_eq!(batches.len(), 1);
1373
1374 let col1 = batches[0].column(0).as_string::<i32>();
1375 assert_eq!(col1.null_count(), 2);
1376 assert_eq!(col1.value(0), "1");
1377 assert_eq!(col1.value(1), "2E0");
1378 assert_eq!(col1.value(2), "2.0");
1379 assert_eq!(col1.value(3), "2");
1380 assert!(col1.is_null(4));
1381 assert!(col1.is_null(5));
1382
1383 let col2 = batches[0].column(1).as_string::<i32>();
1384 assert_eq!(col2.null_count(), 0);
1385 assert_eq!(col2.value(0), "2");
1386 assert_eq!(col2.value(1), "4");
1387 assert_eq!(col2.value(2), "6");
1388 assert_eq!(col2.value(3), "5");
1389 assert_eq!(col2.value(4), "4e0");
1390 assert_eq!(col2.value(5), "7");
1391
1392 let col3 = batches[0].column(2).as_string::<i32>();
1393 assert_eq!(col3.null_count(), 4);
1394 assert_eq!(col3.value(0), "true");
1395 assert_eq!(col3.value(1), "false");
1396 assert!(col3.is_null(2));
1397 assert!(col3.is_null(3));
1398 assert!(col3.is_null(4));
1399 assert!(col3.is_null(5));
1400 }
1401
1402 fn test_decimal<T: DecimalType>(data_type: DataType) {
1403 let buf = r#"
1404 {"a": 1, "b": 2, "c": 38.30}
1405 {"a": 2, "b": 4, "c": 123.456}
1406
1407 {"b": 1337, "a": "2.0452"}
1408 {"b": "5", "a": "11034.2"}
1409 {"b": 40}
1410 {"b": 1234, "a": null}
1411 "#;
1412
1413 let schema = Arc::new(Schema::new(vec![
1414 Field::new("a", data_type.clone(), true),
1415 Field::new("b", data_type.clone(), true),
1416 Field::new("c", data_type, true),
1417 ]));
1418
1419 let batches = do_read(buf, 1024, true, false, schema);
1420 assert_eq!(batches.len(), 1);
1421
1422 let col1 = batches[0].column(0).as_primitive::<T>();
1423 assert_eq!(col1.null_count(), 2);
1424 assert!(col1.is_null(4));
1425 assert!(col1.is_null(5));
1426 assert_eq!(
1427 col1.values(),
1428 &[100, 200, 204, 1103420, 0, 0].map(T::Native::usize_as)
1429 );
1430
1431 let col2 = batches[0].column(1).as_primitive::<T>();
1432 assert_eq!(col2.null_count(), 0);
1433 assert_eq!(
1434 col2.values(),
1435 &[200, 400, 133700, 500, 4000, 123400].map(T::Native::usize_as)
1436 );
1437
1438 let col3 = batches[0].column(2).as_primitive::<T>();
1439 assert_eq!(col3.null_count(), 4);
1440 assert!(!col3.is_null(0));
1441 assert!(!col3.is_null(1));
1442 assert!(col3.is_null(2));
1443 assert!(col3.is_null(3));
1444 assert!(col3.is_null(4));
1445 assert!(col3.is_null(5));
1446 assert_eq!(
1447 col3.values(),
1448 &[3830, 12345, 0, 0, 0, 0].map(T::Native::usize_as)
1449 );
1450 }
1451
1452 #[test]
1453 fn test_decimals() {
1454 test_decimal::<Decimal32Type>(DataType::Decimal32(8, 2));
1455 test_decimal::<Decimal64Type>(DataType::Decimal64(10, 2));
1456 test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
1457 test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
1458 }
1459
1460 fn test_timestamp<T: ArrowTimestampType>() {
1461 let buf = r#"
1462 {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"}
1463 {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456}
1464
1465 {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"}
1466 {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"}
1467 {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"}
1468 {"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"}
1469 "#;
1470
1471 let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".into()));
1472 let schema = Arc::new(Schema::new(vec![
1473 Field::new("a", T::DATA_TYPE, true),
1474 Field::new("b", T::DATA_TYPE, true),
1475 Field::new("c", T::DATA_TYPE, true),
1476 Field::new("d", with_timezone, true),
1477 ]));
1478
1479 let batches = do_read(buf, 1024, true, false, schema);
1480 assert_eq!(batches.len(), 1);
1481
1482 let unit_in_nanos: i64 = match T::UNIT {
1483 TimeUnit::Second => 1_000_000_000,
1484 TimeUnit::Millisecond => 1_000_000,
1485 TimeUnit::Microsecond => 1_000,
1486 TimeUnit::Nanosecond => 1,
1487 };
1488
1489 let col1 = batches[0].column(0).as_primitive::<T>();
1490 assert_eq!(col1.null_count(), 4);
1491 assert!(col1.is_null(2));
1492 assert!(col1.is_null(3));
1493 assert!(col1.is_null(4));
1494 assert!(col1.is_null(5));
1495 assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1496
1497 let col2 = batches[0].column(1).as_primitive::<T>();
1498 assert_eq!(col2.null_count(), 1);
1499 assert!(col2.is_null(5));
1500 assert_eq!(
1501 col2.values(),
1502 &[
1503 1599572549190855000 / unit_in_nanos,
1504 1599572549190855000 / unit_in_nanos,
1505 1599572549000000000 / unit_in_nanos,
1506 40,
1507 1234,
1508 0
1509 ]
1510 );
1511
1512 let col3 = batches[0].column(2).as_primitive::<T>();
1513 assert_eq!(col3.null_count(), 0);
1514 assert_eq!(
1515 col3.values(),
1516 &[
1517 38,
1518 123,
1519 854702816123000000 / unit_in_nanos,
1520 1599572549190855000 / unit_in_nanos,
1521 854702816123000000 / unit_in_nanos,
1522 854738816123000000 / unit_in_nanos
1523 ]
1524 );
1525
1526 let col4 = batches[0].column(3).as_primitive::<T>();
1527
1528 assert_eq!(col4.null_count(), 0);
1529 assert_eq!(
1530 col4.values(),
1531 &[
1532 854674016123000000 / unit_in_nanos,
1533 123,
1534 854702816123000000 / unit_in_nanos,
1535 854720816123000000 / unit_in_nanos,
1536 854674016000000000 / unit_in_nanos,
1537 854640000000000000 / unit_in_nanos
1538 ]
1539 );
1540 }
1541
1542 #[test]
1543 fn test_timestamps() {
1544 test_timestamp::<TimestampSecondType>();
1545 test_timestamp::<TimestampMillisecondType>();
1546 test_timestamp::<TimestampMicrosecondType>();
1547 test_timestamp::<TimestampNanosecondType>();
1548 }
1549
1550 fn test_time<T: ArrowTemporalType>() {
1551 let buf = r#"
1552 {"a": 1, "b": "09:26:56.123 AM", "c": 38.30}
1553 {"a": 2, "b": "23:59:59", "c": 123.456}
1554
1555 {"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"}
1556 {"b": 40, "c": "13:42:29.190855"}
1557 {"b": 1234, "a": null, "c": "09:26:56.123"}
1558 {"c": "14:26:56.123"}
1559 "#;
1560
1561 let unit = match T::DATA_TYPE {
1562 DataType::Time32(unit) | DataType::Time64(unit) => unit,
1563 _ => unreachable!(),
1564 };
1565
1566 let unit_in_nanos = match unit {
1567 TimeUnit::Second => 1_000_000_000,
1568 TimeUnit::Millisecond => 1_000_000,
1569 TimeUnit::Microsecond => 1_000,
1570 TimeUnit::Nanosecond => 1,
1571 };
1572
1573 let schema = Arc::new(Schema::new(vec![
1574 Field::new("a", T::DATA_TYPE, true),
1575 Field::new("b", T::DATA_TYPE, true),
1576 Field::new("c", T::DATA_TYPE, true),
1577 ]));
1578
1579 let batches = do_read(buf, 1024, true, false, schema);
1580 assert_eq!(batches.len(), 1);
1581
1582 let col1 = batches[0].column(0).as_primitive::<T>();
1583 assert_eq!(col1.null_count(), 4);
1584 assert!(col1.is_null(2));
1585 assert!(col1.is_null(3));
1586 assert!(col1.is_null(4));
1587 assert!(col1.is_null(5));
1588 assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1589
1590 let col2 = batches[0].column(1).as_primitive::<T>();
1591 assert_eq!(col2.null_count(), 1);
1592 assert!(col2.is_null(5));
1593 assert_eq!(
1594 col2.values(),
1595 &[
1596 34016123000000 / unit_in_nanos,
1597 86399000000000 / unit_in_nanos,
1598 64800000000000 / unit_in_nanos,
1599 40,
1600 1234,
1601 0
1602 ]
1603 .map(T::Native::usize_as)
1604 );
1605
1606 let col3 = batches[0].column(2).as_primitive::<T>();
1607 assert_eq!(col3.null_count(), 0);
1608 assert_eq!(
1609 col3.values(),
1610 &[
1611 38,
1612 123,
1613 34016123000000 / unit_in_nanos,
1614 49349190855000 / unit_in_nanos,
1615 34016123000000 / unit_in_nanos,
1616 52016123000000 / unit_in_nanos
1617 ]
1618 .map(T::Native::usize_as)
1619 );
1620 }
1621
1622 #[test]
1623 fn test_times() {
1624 test_time::<Time32MillisecondType>();
1625 test_time::<Time32SecondType>();
1626 test_time::<Time64MicrosecondType>();
1627 test_time::<Time64NanosecondType>();
1628 }
1629
1630 fn test_duration<T: ArrowTemporalType>() {
1631 let buf = r#"
1632 {"a": 1, "b": "2"}
1633 {"a": 3, "b": null}
1634 "#;
1635
1636 let schema = Arc::new(Schema::new(vec![
1637 Field::new("a", T::DATA_TYPE, true),
1638 Field::new("b", T::DATA_TYPE, true),
1639 ]));
1640
1641 let batches = do_read(buf, 1024, true, false, schema);
1642 assert_eq!(batches.len(), 1);
1643
1644 let col_a = batches[0].column_by_name("a").unwrap().as_primitive::<T>();
1645 assert_eq!(col_a.null_count(), 0);
1646 assert_eq!(col_a.values(), &[1, 3].map(T::Native::usize_as));
1647
1648 let col2 = batches[0].column_by_name("b").unwrap().as_primitive::<T>();
1649 assert_eq!(col2.null_count(), 1);
1650 assert_eq!(col2.values(), &[2, 0].map(T::Native::usize_as));
1651 }
1652
1653 #[test]
1654 fn test_durations() {
1655 test_duration::<DurationNanosecondType>();
1656 test_duration::<DurationMicrosecondType>();
1657 test_duration::<DurationMillisecondType>();
1658 test_duration::<DurationSecondType>();
1659 }
1660
1661 #[test]
1662 fn test_delta_checkpoint() {
1663 let json = "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}";
1664 let schema = Arc::new(Schema::new(vec![
1665 Field::new_struct(
1666 "protocol",
1667 vec![
1668 Field::new("minReaderVersion", DataType::Int32, true),
1669 Field::new("minWriterVersion", DataType::Int32, true),
1670 ],
1671 true,
1672 ),
1673 Field::new_struct(
1674 "add",
1675 vec![Field::new_map(
1676 "partitionValues",
1677 "key_value",
1678 Field::new("key", DataType::Utf8, false),
1679 Field::new("value", DataType::Utf8, true),
1680 false,
1681 false,
1682 )],
1683 true,
1684 ),
1685 ]));
1686
1687 let batches = do_read(json, 1024, true, false, schema);
1688 assert_eq!(batches.len(), 1);
1689
1690 let s: StructArray = batches.into_iter().next().unwrap().into();
1691 let opts = FormatOptions::default().with_null("null");
1692 let formatter = ArrayFormatter::try_new(&s, &opts).unwrap();
1693 assert_eq!(
1694 formatter.value(0).to_string(),
1695 "{protocol: {minReaderVersion: 1, minWriterVersion: 2}, add: null}"
1696 );
1697 }
1698
1699 #[test]
1700 fn struct_nullability() {
1701 let do_test = |child: DataType| {
1702 let non_null = r#"{"foo": {}}"#;
1704 let schema = Arc::new(Schema::new(vec![Field::new_struct(
1705 "foo",
1706 vec![Field::new("bar", child, false)],
1707 true,
1708 )]));
1709 let mut reader = ReaderBuilder::new(schema.clone())
1710 .build(Cursor::new(non_null.as_bytes()))
1711 .unwrap();
1712 assert!(reader.next().unwrap().is_err()); let null = r#"{"foo": {bar: null}}"#;
1715 let mut reader = ReaderBuilder::new(schema.clone())
1716 .build(Cursor::new(null.as_bytes()))
1717 .unwrap();
1718 assert!(reader.next().unwrap().is_err()); let null = r#"{"foo": null}"#;
1722 let mut reader = ReaderBuilder::new(schema)
1723 .build(Cursor::new(null.as_bytes()))
1724 .unwrap();
1725 let batch = reader.next().unwrap().unwrap();
1726 assert_eq!(batch.num_columns(), 1);
1727 let foo = batch.column(0).as_struct();
1728 assert_eq!(foo.len(), 1);
1729 assert!(foo.is_null(0));
1730 assert_eq!(foo.num_columns(), 1);
1731
1732 let bar = foo.column(0);
1733 assert_eq!(bar.len(), 1);
1734 assert!(bar.is_null(0));
1736 };
1737
1738 do_test(DataType::Boolean);
1739 do_test(DataType::Int32);
1740 do_test(DataType::Utf8);
1741 do_test(DataType::Decimal128(2, 1));
1742 do_test(DataType::Timestamp(
1743 TimeUnit::Microsecond,
1744 Some("+00:00".into()),
1745 ));
1746 }
1747
1748 #[test]
1749 fn test_truncation() {
1750 let buf = r#"
1751 {"i64": 9223372036854775807, "u64": 18446744073709551615 }
1752 {"i64": "9223372036854775807", "u64": "18446744073709551615" }
1753 {"i64": -9223372036854775808, "u64": 0 }
1754 {"i64": "-9223372036854775808", "u64": 0 }
1755 "#;
1756
1757 let schema = Arc::new(Schema::new(vec![
1758 Field::new("i64", DataType::Int64, true),
1759 Field::new("u64", DataType::UInt64, true),
1760 ]));
1761
1762 let batches = do_read(buf, 1024, true, false, schema);
1763 assert_eq!(batches.len(), 1);
1764
1765 let i64 = batches[0].column(0).as_primitive::<Int64Type>();
1766 assert_eq!(i64.values(), &[i64::MAX, i64::MAX, i64::MIN, i64::MIN]);
1767
1768 let u64 = batches[0].column(1).as_primitive::<UInt64Type>();
1769 assert_eq!(u64.values(), &[u64::MAX, u64::MAX, u64::MIN, u64::MIN]);
1770 }
1771
1772 #[test]
1773 fn test_timestamp_truncation() {
1774 let buf = r#"
1775 {"time": 9223372036854775807 }
1776 {"time": -9223372036854775808 }
1777 {"time": 9e5 }
1778 "#;
1779
1780 let schema = Arc::new(Schema::new(vec![Field::new(
1781 "time",
1782 DataType::Timestamp(TimeUnit::Nanosecond, None),
1783 true,
1784 )]));
1785
1786 let batches = do_read(buf, 1024, true, false, schema);
1787 assert_eq!(batches.len(), 1);
1788
1789 let i64 = batches[0]
1790 .column(0)
1791 .as_primitive::<TimestampNanosecondType>();
1792 assert_eq!(i64.values(), &[i64::MAX, i64::MIN, 900000]);
1793 }
1794
1795 #[test]
1796 fn test_strict_mode_no_missing_columns_in_schema() {
1797 let buf = r#"
1798 {"a": 1, "b": "2", "c": true}
1799 {"a": 2E0, "b": "4", "c": false}
1800 "#;
1801
1802 let schema = Arc::new(Schema::new(vec![
1803 Field::new("a", DataType::Int16, false),
1804 Field::new("b", DataType::Utf8, false),
1805 Field::new("c", DataType::Boolean, false),
1806 ]));
1807
1808 let batches = do_read(buf, 1024, true, true, schema);
1809 assert_eq!(batches.len(), 1);
1810
1811 let buf = r#"
1812 {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1813 {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1814 "#;
1815
1816 let schema = Arc::new(Schema::new(vec![
1817 Field::new("a", DataType::Int16, false),
1818 Field::new("b", DataType::Utf8, false),
1819 Field::new_struct(
1820 "c",
1821 vec![
1822 Field::new("a", DataType::Boolean, false),
1823 Field::new("b", DataType::Int16, false),
1824 ],
1825 false,
1826 ),
1827 ]));
1828
1829 let batches = do_read(buf, 1024, true, true, schema);
1830 assert_eq!(batches.len(), 1);
1831 }
1832
1833 #[test]
1834 fn test_strict_mode_missing_columns_in_schema() {
1835 let buf = r#"
1836 {"a": 1, "b": "2", "c": true}
1837 {"a": 2E0, "b": "4", "c": false}
1838 "#;
1839
1840 let schema = Arc::new(Schema::new(vec![
1841 Field::new("a", DataType::Int16, true),
1842 Field::new("c", DataType::Boolean, true),
1843 ]));
1844
1845 let err = ReaderBuilder::new(schema)
1846 .with_batch_size(1024)
1847 .with_strict_mode(true)
1848 .build(Cursor::new(buf.as_bytes()))
1849 .unwrap()
1850 .read()
1851 .unwrap_err();
1852
1853 assert_eq!(
1854 err.to_string(),
1855 "Json error: column 'b' missing from schema"
1856 );
1857
1858 let buf = r#"
1859 {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1860 {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1861 "#;
1862
1863 let schema = Arc::new(Schema::new(vec![
1864 Field::new("a", DataType::Int16, false),
1865 Field::new("b", DataType::Utf8, false),
1866 Field::new_struct("c", vec![Field::new("a", DataType::Boolean, false)], false),
1867 ]));
1868
1869 let err = ReaderBuilder::new(schema)
1870 .with_batch_size(1024)
1871 .with_strict_mode(true)
1872 .build(Cursor::new(buf.as_bytes()))
1873 .unwrap()
1874 .read()
1875 .unwrap_err();
1876
1877 assert_eq!(
1878 err.to_string(),
1879 "Json error: whilst decoding field 'c': column 'b' missing from schema"
1880 );
1881 }
1882
1883 fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
1884 let file = File::open(path).unwrap();
1885 let mut reader = BufReader::new(file);
1886 let schema = schema.unwrap_or_else(|| {
1887 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1888 reader.rewind().unwrap();
1889 schema
1890 });
1891 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1892 builder.build(reader).unwrap()
1893 }
1894
1895 #[test]
1896 fn test_json_basic() {
1897 let mut reader = read_file("test/data/basic.json", None);
1898 let batch = reader.next().unwrap().unwrap();
1899
1900 assert_eq!(8, batch.num_columns());
1901 assert_eq!(12, batch.num_rows());
1902
1903 let schema = reader.schema();
1904 let batch_schema = batch.schema();
1905 assert_eq!(schema, batch_schema);
1906
1907 let a = schema.column_with_name("a").unwrap();
1908 assert_eq!(0, a.0);
1909 assert_eq!(&DataType::Int64, a.1.data_type());
1910 let b = schema.column_with_name("b").unwrap();
1911 assert_eq!(1, b.0);
1912 assert_eq!(&DataType::Float64, b.1.data_type());
1913 let c = schema.column_with_name("c").unwrap();
1914 assert_eq!(2, c.0);
1915 assert_eq!(&DataType::Boolean, c.1.data_type());
1916 let d = schema.column_with_name("d").unwrap();
1917 assert_eq!(3, d.0);
1918 assert_eq!(&DataType::Utf8, d.1.data_type());
1919
1920 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1921 assert_eq!(1, aa.value(0));
1922 assert_eq!(-10, aa.value(1));
1923 let bb = batch.column(b.0).as_primitive::<Float64Type>();
1924 assert_eq!(2.0, bb.value(0));
1925 assert_eq!(-3.5, bb.value(1));
1926 let cc = batch.column(c.0).as_boolean();
1927 assert!(!cc.value(0));
1928 assert!(cc.value(10));
1929 let dd = batch.column(d.0).as_string::<i32>();
1930 assert_eq!("4", dd.value(0));
1931 assert_eq!("text", dd.value(8));
1932 }
1933
1934 #[test]
1935 fn test_json_empty_projection() {
1936 let mut reader = read_file("test/data/basic.json", Some(Schema::empty()));
1937 let batch = reader.next().unwrap().unwrap();
1938
1939 assert_eq!(0, batch.num_columns());
1940 assert_eq!(12, batch.num_rows());
1941 }
1942
1943 #[test]
1944 fn test_json_basic_with_nulls() {
1945 let mut reader = read_file("test/data/basic_nulls.json", None);
1946 let batch = reader.next().unwrap().unwrap();
1947
1948 assert_eq!(4, batch.num_columns());
1949 assert_eq!(12, batch.num_rows());
1950
1951 let schema = reader.schema();
1952 let batch_schema = batch.schema();
1953 assert_eq!(schema, batch_schema);
1954
1955 let a = schema.column_with_name("a").unwrap();
1956 assert_eq!(&DataType::Int64, a.1.data_type());
1957 let b = schema.column_with_name("b").unwrap();
1958 assert_eq!(&DataType::Float64, b.1.data_type());
1959 let c = schema.column_with_name("c").unwrap();
1960 assert_eq!(&DataType::Boolean, c.1.data_type());
1961 let d = schema.column_with_name("d").unwrap();
1962 assert_eq!(&DataType::Utf8, d.1.data_type());
1963
1964 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1965 assert!(aa.is_valid(0));
1966 assert!(!aa.is_valid(1));
1967 assert!(!aa.is_valid(11));
1968 let bb = batch.column(b.0).as_primitive::<Float64Type>();
1969 assert!(bb.is_valid(0));
1970 assert!(!bb.is_valid(2));
1971 assert!(!bb.is_valid(11));
1972 let cc = batch.column(c.0).as_boolean();
1973 assert!(cc.is_valid(0));
1974 assert!(!cc.is_valid(4));
1975 assert!(!cc.is_valid(11));
1976 let dd = batch.column(d.0).as_string::<i32>();
1977 assert!(!dd.is_valid(0));
1978 assert!(dd.is_valid(1));
1979 assert!(!dd.is_valid(4));
1980 assert!(!dd.is_valid(11));
1981 }
1982
1983 #[test]
1984 fn test_json_basic_schema() {
1985 let schema = Schema::new(vec![
1986 Field::new("a", DataType::Int64, true),
1987 Field::new("b", DataType::Float32, false),
1988 Field::new("c", DataType::Boolean, false),
1989 Field::new("d", DataType::Utf8, false),
1990 ]);
1991
1992 let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1993 let reader_schema = reader.schema();
1994 assert_eq!(reader_schema.as_ref(), &schema);
1995 let batch = reader.next().unwrap().unwrap();
1996
1997 assert_eq!(4, batch.num_columns());
1998 assert_eq!(12, batch.num_rows());
1999
2000 let schema = batch.schema();
2001
2002 let a = schema.column_with_name("a").unwrap();
2003 assert_eq!(&DataType::Int64, a.1.data_type());
2004 let b = schema.column_with_name("b").unwrap();
2005 assert_eq!(&DataType::Float32, b.1.data_type());
2006 let c = schema.column_with_name("c").unwrap();
2007 assert_eq!(&DataType::Boolean, c.1.data_type());
2008 let d = schema.column_with_name("d").unwrap();
2009 assert_eq!(&DataType::Utf8, d.1.data_type());
2010
2011 let aa = batch.column(a.0).as_primitive::<Int64Type>();
2012 assert_eq!(1, aa.value(0));
2013 assert_eq!(100000000000000, aa.value(11));
2014 let bb = batch.column(b.0).as_primitive::<Float32Type>();
2015 assert_eq!(2.0, bb.value(0));
2016 assert_eq!(-3.5, bb.value(1));
2017 }
2018
2019 #[test]
2020 fn test_json_basic_schema_projection() {
2021 let schema = Schema::new(vec![
2022 Field::new("a", DataType::Int64, true),
2023 Field::new("c", DataType::Boolean, false),
2024 ]);
2025
2026 let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
2027 let batch = reader.next().unwrap().unwrap();
2028
2029 assert_eq!(2, batch.num_columns());
2030 assert_eq!(2, batch.schema().fields().len());
2031 assert_eq!(12, batch.num_rows());
2032
2033 assert_eq!(batch.schema().as_ref(), &schema);
2034
2035 let a = schema.column_with_name("a").unwrap();
2036 assert_eq!(0, a.0);
2037 assert_eq!(&DataType::Int64, a.1.data_type());
2038 let c = schema.column_with_name("c").unwrap();
2039 assert_eq!(1, c.0);
2040 assert_eq!(&DataType::Boolean, c.1.data_type());
2041 }
2042
2043 #[test]
2044 fn test_json_arrays() {
2045 let mut reader = read_file("test/data/arrays.json", None);
2046 let batch = reader.next().unwrap().unwrap();
2047
2048 assert_eq!(4, batch.num_columns());
2049 assert_eq!(3, batch.num_rows());
2050
2051 let schema = batch.schema();
2052
2053 let a = schema.column_with_name("a").unwrap();
2054 assert_eq!(&DataType::Int64, a.1.data_type());
2055 let b = schema.column_with_name("b").unwrap();
2056 assert_eq!(
2057 &DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))),
2058 b.1.data_type()
2059 );
2060 let c = schema.column_with_name("c").unwrap();
2061 assert_eq!(
2062 &DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
2063 c.1.data_type()
2064 );
2065 let d = schema.column_with_name("d").unwrap();
2066 assert_eq!(&DataType::Utf8, d.1.data_type());
2067
2068 let aa = batch.column(a.0).as_primitive::<Int64Type>();
2069 assert_eq!(1, aa.value(0));
2070 assert_eq!(-10, aa.value(1));
2071 assert_eq!(1627668684594000000, aa.value(2));
2072 let bb = batch.column(b.0).as_list::<i32>();
2073 let bb = bb.values().as_primitive::<Float64Type>();
2074 assert_eq!(9, bb.len());
2075 assert_eq!(2.0, bb.value(0));
2076 assert_eq!(-6.1, bb.value(5));
2077 assert!(!bb.is_valid(7));
2078
2079 let cc = batch
2080 .column(c.0)
2081 .as_any()
2082 .downcast_ref::<ListArray>()
2083 .unwrap();
2084 let cc = cc.values().as_boolean();
2085 assert_eq!(6, cc.len());
2086 assert!(!cc.value(0));
2087 assert!(!cc.value(4));
2088 assert!(!cc.is_valid(5));
2089 }
2090
2091 #[test]
2092 fn test_empty_json_arrays() {
2093 let json_content = r#"
2094 {"items": []}
2095 {"items": null}
2096 {}
2097 "#;
2098
2099 let schema = Arc::new(Schema::new(vec![Field::new(
2100 "items",
2101 DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2102 true,
2103 )]));
2104
2105 let batches = do_read(json_content, 1024, false, false, schema);
2106 assert_eq!(batches.len(), 1);
2107
2108 let col1 = batches[0].column(0).as_list::<i32>();
2109 assert_eq!(col1.null_count(), 2);
2110 assert!(col1.value(0).is_empty());
2111 assert_eq!(col1.value(0).data_type(), &DataType::Null);
2112 assert!(col1.is_null(1));
2113 assert!(col1.is_null(2));
2114 }
2115
2116 #[test]
2117 fn test_nested_empty_json_arrays() {
2118 let json_content = r#"
2119 {"items": [[],[]]}
2120 {"items": [[null, null],[null]]}
2121 "#;
2122
2123 let schema = Arc::new(Schema::new(vec![Field::new(
2124 "items",
2125 DataType::List(FieldRef::new(Field::new_list_field(
2126 DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2127 true,
2128 ))),
2129 true,
2130 )]));
2131
2132 let batches = do_read(json_content, 1024, false, false, schema);
2133 assert_eq!(batches.len(), 1);
2134
2135 let col1 = batches[0].column(0).as_list::<i32>();
2136 assert_eq!(col1.null_count(), 0);
2137 assert_eq!(col1.value(0).len(), 2);
2138 assert!(col1.value(0).as_list::<i32>().value(0).is_empty());
2139 assert!(col1.value(0).as_list::<i32>().value(1).is_empty());
2140
2141 assert_eq!(col1.value(1).len(), 2);
2142 assert_eq!(col1.value(1).as_list::<i32>().value(0).len(), 2);
2143 assert_eq!(col1.value(1).as_list::<i32>().value(1).len(), 1);
2144 }
2145
2146 #[test]
2147 fn test_nested_list_json_arrays() {
2148 let c_field = Field::new_struct("c", vec![Field::new("d", DataType::Utf8, true)], true);
2149 let a_struct_field = Field::new_struct(
2150 "a",
2151 vec![Field::new("b", DataType::Boolean, true), c_field.clone()],
2152 true,
2153 );
2154 let a_field = Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
2155 let schema = Arc::new(Schema::new(vec![a_field.clone()]));
2156 let builder = ReaderBuilder::new(schema).with_batch_size(64);
2157 let json_content = r#"
2158 {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
2159 {"a": [{"b": false, "c": null}]}
2160 {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
2161 {"a": null}
2162 {"a": []}
2163 {"a": [null]}
2164 "#;
2165 let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2166
2167 let d = StringArray::from(vec![
2169 Some("a_text"),
2170 Some("b_text"),
2171 None,
2172 Some("c_text"),
2173 Some("d_text"),
2174 None,
2175 None,
2176 ]);
2177 let c = ArrayDataBuilder::new(c_field.data_type().clone())
2178 .len(7)
2179 .add_child_data(d.to_data())
2180 .null_bit_buffer(Some(Buffer::from([0b00111011])))
2181 .build()
2182 .unwrap();
2183 let b = BooleanArray::from(vec![
2184 Some(true),
2185 Some(false),
2186 Some(false),
2187 Some(true),
2188 None,
2189 Some(true),
2190 None,
2191 ]);
2192 let a = ArrayDataBuilder::new(a_struct_field.data_type().clone())
2193 .len(7)
2194 .add_child_data(b.to_data())
2195 .add_child_data(c.clone())
2196 .null_bit_buffer(Some(Buffer::from([0b00111111])))
2197 .build()
2198 .unwrap();
2199 let a_list = ArrayDataBuilder::new(a_field.data_type().clone())
2200 .len(6)
2201 .add_buffer(Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7]))
2202 .add_child_data(a)
2203 .null_bit_buffer(Some(Buffer::from([0b00110111])))
2204 .build()
2205 .unwrap();
2206 let expected = make_array(a_list);
2207
2208 let batch = reader.next().unwrap().unwrap();
2210 let read = batch.column(0);
2211 assert_eq!(read.len(), 6);
2212 let read: &ListArray = read.as_list::<i32>();
2214 let expected = expected.as_list::<i32>();
2215 assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
2216 assert_eq!(read.nulls(), expected.nulls());
2218 let struct_array = read.values().as_struct();
2220 let expected_struct_array = expected.values().as_struct();
2221
2222 assert_eq!(7, struct_array.len());
2223 assert_eq!(1, struct_array.null_count());
2224 assert_eq!(7, expected_struct_array.len());
2225 assert_eq!(1, expected_struct_array.null_count());
2226 assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
2228 let read_b = struct_array.column(0);
2230 assert_eq!(read_b.as_ref(), &b);
2231 let read_c = struct_array.column(1);
2232 assert_eq!(read_c.to_data(), c);
2233 let read_c = read_c.as_struct();
2234 let read_d = read_c.column(0);
2235 assert_eq!(read_d.as_ref(), &d);
2236
2237 assert_eq!(read, expected);
2238 }
2239
2240 fn assert_read_list_view<O: OffsetSizeTrait>() {
2241 let field = Arc::new(Field::new("item", DataType::Int32, true));
2242 let data_type = GenericListViewArray::<O>::DATA_TYPE_CONSTRUCTOR(field.clone());
2243 let schema = Arc::new(Schema::new(vec![Field::new("lv", data_type, true)]));
2244
2245 let buf = r#"
2246 {"lv": [1, 2, 3]}
2247 {"lv": [4, null]}
2248 {"lv": null}
2249 {"lv": [6]}
2250 {"lv": []}
2251 "#;
2252
2253 let batches = do_read(buf, 1024, false, false, schema);
2254 assert_eq!(batches.len(), 1);
2255 let batch = &batches[0];
2256 let col = batch.column(0);
2257 let list_view = col
2258 .as_any()
2259 .downcast_ref::<GenericListViewArray<O>>()
2260 .unwrap();
2261
2262 assert_eq!(list_view.len(), 5);
2263
2264 let expected_offsets: Vec<O> = vec![0, 3, 5, 5, 6]
2266 .into_iter()
2267 .map(|v| O::usize_as(v))
2268 .collect();
2269 let expected_sizes: Vec<O> = vec![3, 2, 0, 1, 0]
2270 .into_iter()
2271 .map(|v| O::usize_as(v))
2272 .collect();
2273 assert_eq!(list_view.value_offsets(), &expected_offsets);
2274 assert_eq!(list_view.value_sizes(), &expected_sizes);
2275
2276 assert!(list_view.is_valid(0));
2278 let vals = list_view.value(0);
2279 let ints = vals.as_primitive::<Int32Type>();
2280 assert_eq!(ints.values(), &[1, 2, 3]);
2281
2282 assert!(list_view.is_valid(1));
2284 let vals = list_view.value(1);
2285 let ints = vals.as_primitive::<Int32Type>();
2286 assert_eq!(ints.len(), 2);
2287 assert_eq!(ints.value(0), 4);
2288 assert!(ints.is_null(1));
2289
2290 assert!(list_view.is_null(2));
2292
2293 assert!(list_view.is_valid(3));
2295 let vals = list_view.value(3);
2296 let ints = vals.as_primitive::<Int32Type>();
2297 assert_eq!(ints.values(), &[6]);
2298
2299 assert!(list_view.is_valid(4));
2301 let vals = list_view.value(4);
2302 assert_eq!(vals.len(), 0);
2303 }
2304
2305 #[test]
2306 fn test_read_list_view() {
2307 assert_read_list_view::<i32>();
2308 assert_read_list_view::<i64>();
2309 }
2310
2311 #[test]
2312 fn test_skip_empty_lines() {
2313 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2314 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
2315 let json_content = "
2316 {\"a\": 1}
2317 {\"a\": 2}
2318 {\"a\": 3}";
2319 let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2320 let batch = reader.next().unwrap().unwrap();
2321
2322 assert_eq!(1, batch.num_columns());
2323 assert_eq!(3, batch.num_rows());
2324
2325 let schema = reader.schema();
2326 let c = schema.column_with_name("a").unwrap();
2327 assert_eq!(&DataType::Int64, c.1.data_type());
2328 }
2329
2330 #[test]
2331 fn test_with_multiple_batches() {
2332 let file = File::open("test/data/basic_nulls.json").unwrap();
2333 let mut reader = BufReader::new(file);
2334 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2335 reader.rewind().unwrap();
2336
2337 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2338 let mut reader = builder.build(reader).unwrap();
2339
2340 let mut num_records = Vec::new();
2341 while let Some(rb) = reader.next().transpose().unwrap() {
2342 num_records.push(rb.num_rows());
2343 }
2344
2345 assert_eq!(vec![5, 5, 2], num_records);
2346 }
2347
2348 #[test]
2349 fn test_timestamp_from_json_seconds() {
2350 let schema = Schema::new(vec![Field::new(
2351 "a",
2352 DataType::Timestamp(TimeUnit::Second, None),
2353 true,
2354 )]);
2355
2356 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2357 let batch = reader.next().unwrap().unwrap();
2358
2359 assert_eq!(1, batch.num_columns());
2360 assert_eq!(12, batch.num_rows());
2361
2362 let schema = reader.schema();
2363 let batch_schema = batch.schema();
2364 assert_eq!(schema, batch_schema);
2365
2366 let a = schema.column_with_name("a").unwrap();
2367 assert_eq!(
2368 &DataType::Timestamp(TimeUnit::Second, None),
2369 a.1.data_type()
2370 );
2371
2372 let aa = batch.column(a.0).as_primitive::<TimestampSecondType>();
2373 assert!(aa.is_valid(0));
2374 assert!(!aa.is_valid(1));
2375 assert!(!aa.is_valid(2));
2376 assert_eq!(1, aa.value(0));
2377 assert_eq!(1, aa.value(3));
2378 assert_eq!(5, aa.value(7));
2379 }
2380
2381 #[test]
2382 fn test_timestamp_from_json_milliseconds() {
2383 let schema = Schema::new(vec![Field::new(
2384 "a",
2385 DataType::Timestamp(TimeUnit::Millisecond, None),
2386 true,
2387 )]);
2388
2389 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2390 let batch = reader.next().unwrap().unwrap();
2391
2392 assert_eq!(1, batch.num_columns());
2393 assert_eq!(12, batch.num_rows());
2394
2395 let schema = reader.schema();
2396 let batch_schema = batch.schema();
2397 assert_eq!(schema, batch_schema);
2398
2399 let a = schema.column_with_name("a").unwrap();
2400 assert_eq!(
2401 &DataType::Timestamp(TimeUnit::Millisecond, None),
2402 a.1.data_type()
2403 );
2404
2405 let aa = batch.column(a.0).as_primitive::<TimestampMillisecondType>();
2406 assert!(aa.is_valid(0));
2407 assert!(!aa.is_valid(1));
2408 assert!(!aa.is_valid(2));
2409 assert_eq!(1, aa.value(0));
2410 assert_eq!(1, aa.value(3));
2411 assert_eq!(5, aa.value(7));
2412 }
2413
2414 #[test]
2415 fn test_date_from_json_milliseconds() {
2416 let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
2417
2418 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2419 let batch = reader.next().unwrap().unwrap();
2420
2421 assert_eq!(1, batch.num_columns());
2422 assert_eq!(12, batch.num_rows());
2423
2424 let schema = reader.schema();
2425 let batch_schema = batch.schema();
2426 assert_eq!(schema, batch_schema);
2427
2428 let a = schema.column_with_name("a").unwrap();
2429 assert_eq!(&DataType::Date64, a.1.data_type());
2430
2431 let aa = batch.column(a.0).as_primitive::<Date64Type>();
2432 assert!(aa.is_valid(0));
2433 assert!(!aa.is_valid(1));
2434 assert!(!aa.is_valid(2));
2435 assert_eq!(1, aa.value(0));
2436 assert_eq!(1, aa.value(3));
2437 assert_eq!(5, aa.value(7));
2438 }
2439
2440 #[test]
2441 fn test_time_from_json_nanoseconds() {
2442 let schema = Schema::new(vec![Field::new(
2443 "a",
2444 DataType::Time64(TimeUnit::Nanosecond),
2445 true,
2446 )]);
2447
2448 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2449 let batch = reader.next().unwrap().unwrap();
2450
2451 assert_eq!(1, batch.num_columns());
2452 assert_eq!(12, batch.num_rows());
2453
2454 let schema = reader.schema();
2455 let batch_schema = batch.schema();
2456 assert_eq!(schema, batch_schema);
2457
2458 let a = schema.column_with_name("a").unwrap();
2459 assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
2460
2461 let aa = batch.column(a.0).as_primitive::<Time64NanosecondType>();
2462 assert!(aa.is_valid(0));
2463 assert!(!aa.is_valid(1));
2464 assert!(!aa.is_valid(2));
2465 assert_eq!(1, aa.value(0));
2466 assert_eq!(1, aa.value(3));
2467 assert_eq!(5, aa.value(7));
2468 }
2469
2470 #[test]
2471 fn test_json_iterator() {
2472 let file = File::open("test/data/basic.json").unwrap();
2473 let mut reader = BufReader::new(file);
2474 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2475 reader.rewind().unwrap();
2476
2477 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2478 let reader = builder.build(reader).unwrap();
2479 let schema = reader.schema();
2480 let (col_a_index, _) = schema.column_with_name("a").unwrap();
2481
2482 let mut sum_num_rows = 0;
2483 let mut num_batches = 0;
2484 let mut sum_a = 0;
2485 for batch in reader {
2486 let batch = batch.unwrap();
2487 assert_eq!(8, batch.num_columns());
2488 sum_num_rows += batch.num_rows();
2489 num_batches += 1;
2490 let batch_schema = batch.schema();
2491 assert_eq!(schema, batch_schema);
2492 let a_array = batch.column(col_a_index).as_primitive::<Int64Type>();
2493 sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
2494 }
2495 assert_eq!(12, sum_num_rows);
2496 assert_eq!(3, num_batches);
2497 assert_eq!(100000000000011, sum_a);
2498 }
2499
2500 #[test]
2501 fn test_decoder_error() {
2502 let schema = Arc::new(Schema::new(vec![Field::new_struct(
2503 "a",
2504 vec![Field::new("child", DataType::Int32, false)],
2505 true,
2506 )]));
2507
2508 let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
2509 let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
2510 assert!(decoder.tape_decoder.has_partial_row());
2511 assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2512 let _ = decoder.flush().unwrap_err();
2513 assert!(decoder.tape_decoder.has_partial_row());
2514 assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2515
2516 let parse_err = |s: &str| {
2517 ReaderBuilder::new(schema.clone())
2518 .build(Cursor::new(s.as_bytes()))
2519 .unwrap()
2520 .next()
2521 .unwrap()
2522 .unwrap_err()
2523 .to_string()
2524 };
2525
2526 let err = parse_err(r#"{"a": 123}"#);
2527 assert_eq!(
2528 err,
2529 "Json error: whilst decoding field 'a': expected { got 123"
2530 );
2531
2532 let err = parse_err(r#"{"a": ["bar"]}"#);
2533 assert_eq!(
2534 err,
2535 r#"Json error: whilst decoding field 'a': expected { got ["bar"]"#
2536 );
2537
2538 let err = parse_err(r#"{"a": []}"#);
2539 assert_eq!(
2540 err,
2541 "Json error: whilst decoding field 'a': expected { got []"
2542 );
2543
2544 let err = parse_err(r#"{"a": [{"child": 234}]}"#);
2545 assert_eq!(
2546 err,
2547 r#"Json error: whilst decoding field 'a': expected { got [{"child": 234}]"#
2548 );
2549
2550 let err = parse_err(r#"{"a": [{"child": {"foo": [{"foo": ["bar"]}]}}]}"#);
2551 assert_eq!(
2552 err,
2553 r#"Json error: whilst decoding field 'a': expected { got [{"child": {"foo": [{"foo": ["bar"]}]}}]"#
2554 );
2555
2556 let err = parse_err(r#"{"a": true}"#);
2557 assert_eq!(
2558 err,
2559 "Json error: whilst decoding field 'a': expected { got true"
2560 );
2561
2562 let err = parse_err(r#"{"a": false}"#);
2563 assert_eq!(
2564 err,
2565 "Json error: whilst decoding field 'a': expected { got false"
2566 );
2567
2568 let err = parse_err(r#"{"a": "foo"}"#);
2569 assert_eq!(
2570 err,
2571 "Json error: whilst decoding field 'a': expected { got \"foo\""
2572 );
2573
2574 let err = parse_err(r#"{"a": {"child": false}}"#);
2575 assert_eq!(
2576 err,
2577 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got false"
2578 );
2579
2580 let err = parse_err(r#"{"a": {"child": []}}"#);
2581 assert_eq!(
2582 err,
2583 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got []"
2584 );
2585
2586 let err = parse_err(r#"{"a": {"child": [123]}}"#);
2587 assert_eq!(
2588 err,
2589 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123]"
2590 );
2591
2592 let err = parse_err(r#"{"a": {"child": [123, 3465346]}}"#);
2593 assert_eq!(
2594 err,
2595 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123, 3465346]"
2596 );
2597 }
2598
2599 #[test]
2600 fn test_serialize_timestamp() {
2601 let json = vec![
2602 json!({"timestamp": 1681319393}),
2603 json!({"timestamp": "1970-01-01T00:00:00+02:00"}),
2604 ];
2605 let schema = Schema::new(vec![Field::new(
2606 "timestamp",
2607 DataType::Timestamp(TimeUnit::Second, None),
2608 true,
2609 )]);
2610 let mut decoder = ReaderBuilder::new(Arc::new(schema))
2611 .build_decoder()
2612 .unwrap();
2613 decoder.serialize(&json).unwrap();
2614 let batch = decoder.flush().unwrap().unwrap();
2615 assert_eq!(batch.num_rows(), 2);
2616 assert_eq!(batch.num_columns(), 1);
2617 let values = batch.column(0).as_primitive::<TimestampSecondType>();
2618 assert_eq!(values.values(), &[1681319393, -7200]);
2619 }
2620
2621 #[test]
2622 fn test_serialize_decimal() {
2623 let json = vec![
2624 json!({"decimal": 1.234}),
2625 json!({"decimal": "1.234"}),
2626 json!({"decimal": 1234}),
2627 json!({"decimal": "1234"}),
2628 ];
2629 let schema = Schema::new(vec![Field::new(
2630 "decimal",
2631 DataType::Decimal128(10, 3),
2632 true,
2633 )]);
2634 let mut decoder = ReaderBuilder::new(Arc::new(schema))
2635 .build_decoder()
2636 .unwrap();
2637 decoder.serialize(&json).unwrap();
2638 let batch = decoder.flush().unwrap().unwrap();
2639 assert_eq!(batch.num_rows(), 4);
2640 assert_eq!(batch.num_columns(), 1);
2641 let values = batch.column(0).as_primitive::<Decimal128Type>();
2642 assert_eq!(values.values(), &[1234, 1234, 1234000, 1234000]);
2643 }
2644
2645 #[test]
2646 fn test_serde_field() {
2647 let field = Field::new("int", DataType::Int32, true);
2648 let mut decoder = ReaderBuilder::new_with_field(field)
2649 .build_decoder()
2650 .unwrap();
2651 decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
2652 let b = decoder.flush().unwrap().unwrap();
2653 let values = b.column(0).as_primitive::<Int32Type>().values();
2654 assert_eq!(values, &[1, 2, 3, 4]);
2655 }
2656
2657 #[test]
2658 fn test_serde_large_numbers() {
2659 let field = Field::new("int", DataType::Int64, true);
2660 let mut decoder = ReaderBuilder::new_with_field(field)
2661 .build_decoder()
2662 .unwrap();
2663
2664 decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2665 let b = decoder.flush().unwrap().unwrap();
2666 let values = b.column(0).as_primitive::<Int64Type>().values();
2667 assert_eq!(values, &[1699148028689, 2, 3, 4]);
2668
2669 let field = Field::new(
2670 "int",
2671 DataType::Timestamp(TimeUnit::Microsecond, None),
2672 true,
2673 );
2674 let mut decoder = ReaderBuilder::new_with_field(field)
2675 .build_decoder()
2676 .unwrap();
2677
2678 decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2679 let b = decoder.flush().unwrap().unwrap();
2680 let values = b
2681 .column(0)
2682 .as_primitive::<TimestampMicrosecondType>()
2683 .values();
2684 assert_eq!(values, &[1699148028689, 2, 3, 4]);
2685 }
2686
2687 #[test]
2688 fn test_coercing_primitive_into_string_decoder() {
2689 let buf = &format!(
2690 r#"[{{"a": 1, "b": "A", "c": "T"}}, {{"a": 2, "b": "BB", "c": "F"}}, {{"a": {}, "b": 123, "c": false}}, {{"a": {}, "b": 789, "c": true}}]"#,
2691 (i32::MAX as i64 + 10),
2692 i64::MAX - 10
2693 );
2694 let schema = Schema::new(vec![
2695 Field::new("a", DataType::Float64, true),
2696 Field::new("b", DataType::Utf8, true),
2697 Field::new("c", DataType::Utf8, true),
2698 ]);
2699 let json_array: Vec<serde_json::Value> = serde_json::from_str(buf).unwrap();
2700 let schema_ref = Arc::new(schema);
2701
2702 let reader = ReaderBuilder::new(schema_ref.clone()).with_coerce_primitive(true);
2704 let mut decoder = reader.build_decoder().unwrap();
2705 decoder.serialize(json_array.as_slice()).unwrap();
2706 let batch = decoder.flush().unwrap().unwrap();
2707 assert_eq!(
2708 batch,
2709 RecordBatch::try_new(
2710 schema_ref,
2711 vec![
2712 Arc::new(Float64Array::from(vec![
2713 1.0,
2714 2.0,
2715 (i32::MAX as i64 + 10) as f64,
2716 (i64::MAX - 10) as f64
2717 ])),
2718 Arc::new(StringArray::from(vec!["A", "BB", "123", "789"])),
2719 Arc::new(StringArray::from(vec!["T", "F", "false", "true"])),
2720 ]
2721 )
2722 .unwrap()
2723 );
2724 }
2725
2726 fn _parse_structs(
2731 row: &str,
2732 struct_mode: StructMode,
2733 fields: Fields,
2734 as_struct: bool,
2735 ) -> Result<RecordBatch, ArrowError> {
2736 let builder = if as_struct {
2737 ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
2738 } else {
2739 ReaderBuilder::new(Arc::new(Schema::new(fields)))
2740 };
2741 builder
2742 .with_struct_mode(struct_mode)
2743 .build(Cursor::new(row.as_bytes()))
2744 .unwrap()
2745 .next()
2746 .unwrap()
2747 }
2748
2749 #[test]
2750 fn test_struct_decoding_list_length() {
2751 use arrow_array::array;
2752
2753 let row = "[1, 2]";
2754
2755 let mut fields = vec![Field::new("a", DataType::Int32, true)];
2756 let too_few_fields = Fields::from(fields.clone());
2757 fields.push(Field::new("b", DataType::Int32, true));
2758 let correct_fields = Fields::from(fields.clone());
2759 fields.push(Field::new("c", DataType::Int32, true));
2760 let too_many_fields = Fields::from(fields.clone());
2761
2762 let parse = |fields: Fields, as_struct: bool| {
2763 _parse_structs(row, StructMode::ListOnly, fields, as_struct)
2764 };
2765
2766 let expected_row = StructArray::new(
2767 correct_fields.clone(),
2768 vec![
2769 Arc::new(array::Int32Array::from(vec![1])),
2770 Arc::new(array::Int32Array::from(vec![2])),
2771 ],
2772 None,
2773 );
2774 let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);
2775
2776 assert_eq!(
2777 parse(too_few_fields.clone(), true).unwrap_err().to_string(),
2778 "Json error: found extra columns for 1 fields".to_string()
2779 );
2780 assert_eq!(
2781 parse(too_few_fields, false).unwrap_err().to_string(),
2782 "Json error: found extra columns for 1 fields".to_string()
2783 );
2784 assert_eq!(
2785 parse(correct_fields.clone(), true).unwrap(),
2786 RecordBatch::try_new(
2787 Arc::new(Schema::new(vec![row_field])),
2788 vec![Arc::new(expected_row.clone())]
2789 )
2790 .unwrap()
2791 );
2792 assert_eq!(
2793 parse(correct_fields, false).unwrap(),
2794 RecordBatch::from(expected_row)
2795 );
2796 assert_eq!(
2797 parse(too_many_fields.clone(), true)
2798 .unwrap_err()
2799 .to_string(),
2800 "Json error: found 2 columns for 3 fields".to_string()
2801 );
2802 assert_eq!(
2803 parse(too_many_fields, false).unwrap_err().to_string(),
2804 "Json error: found 2 columns for 3 fields".to_string()
2805 );
2806 }
2807
2808 #[test]
2809 fn test_struct_decoding() {
2810 use arrow_array::builder;
2811
2812 let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
2813 let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
2814 let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;
2815
2816 let struct_fields = Fields::from(vec![
2817 Field::new("b", DataType::new_list(DataType::Int32, true), true),
2818 Field::new_map(
2819 "c",
2820 "entries",
2821 Field::new("keys", DataType::Utf8, false),
2822 Field::new("values", DataType::Int32, true),
2823 false,
2824 false,
2825 ),
2826 ]);
2827
2828 let list_array =
2829 ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);
2830
2831 let map_array = {
2832 let mut map_builder = builder::MapBuilder::new(
2833 None,
2834 builder::StringBuilder::new(),
2835 builder::Int32Builder::new(),
2836 );
2837 map_builder.keys().append_value("d");
2838 map_builder.values().append_value(3);
2839 map_builder.append(true).unwrap();
2840 map_builder.finish()
2841 };
2842
2843 let struct_array = StructArray::new(
2844 struct_fields.clone(),
2845 vec![Arc::new(list_array), Arc::new(map_array)],
2846 None,
2847 );
2848
2849 let fields = Fields::from(vec![Field::new("a", DataType::Struct(struct_fields), true)]);
2850 let schema = Arc::new(Schema::new(fields.clone()));
2851 let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();
2852
2853 let parse = |row: &str, struct_mode: StructMode| {
2854 _parse_structs(row, struct_mode, fields.clone(), false)
2855 };
2856
2857 assert_eq!(
2858 parse(nested_object_json, StructMode::ObjectOnly).unwrap(),
2859 expected
2860 );
2861 assert_eq!(
2862 parse(nested_list_json, StructMode::ObjectOnly)
2863 .unwrap_err()
2864 .to_string(),
2865 "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
2866 );
2867 assert_eq!(
2868 parse(nested_mixed_json, StructMode::ObjectOnly)
2869 .unwrap_err()
2870 .to_string(),
2871 "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
2872 );
2873
2874 assert_eq!(
2875 parse(nested_list_json, StructMode::ListOnly).unwrap(),
2876 expected
2877 );
2878 assert_eq!(
2879 parse(nested_object_json, StructMode::ListOnly)
2880 .unwrap_err()
2881 .to_string(),
2882 "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
2883 );
2884 assert_eq!(
2885 parse(nested_mixed_json, StructMode::ListOnly)
2886 .unwrap_err()
2887 .to_string(),
2888 "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
2889 );
2890 }
2891
2892 #[test]
2898 fn test_struct_decoding_empty_list() {
2899 let int_field = Field::new("a", DataType::Int32, true);
2900 let struct_field = Field::new(
2901 "r",
2902 DataType::Struct(Fields::from(vec![int_field.clone()])),
2903 true,
2904 );
2905
2906 let parse = |row: &str, as_struct: bool, field: Field| {
2907 _parse_structs(
2908 row,
2909 StructMode::ListOnly,
2910 Fields::from(vec![field]),
2911 as_struct,
2912 )
2913 };
2914
2915 assert_eq!(
2917 parse("[]", true, struct_field.clone())
2918 .unwrap_err()
2919 .to_string(),
2920 "Json error: found 0 columns for 1 fields".to_owned()
2921 );
2922 assert_eq!(
2923 parse("[]", false, int_field.clone())
2924 .unwrap_err()
2925 .to_string(),
2926 "Json error: found 0 columns for 1 fields".to_owned()
2927 );
2928 assert_eq!(
2929 parse("[]", false, struct_field.clone())
2930 .unwrap_err()
2931 .to_string(),
2932 "Json error: found 0 columns for 1 fields".to_owned()
2933 );
2934 assert_eq!(
2935 parse("[[]]", false, struct_field.clone())
2936 .unwrap_err()
2937 .to_string(),
2938 "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
2939 );
2940 }
2941
2942 #[test]
2943 fn test_decode_list_struct_with_wrong_types() {
2944 let int_field = Field::new("a", DataType::Int32, true);
2945 let struct_field = Field::new(
2946 "r",
2947 DataType::Struct(Fields::from(vec![int_field.clone()])),
2948 true,
2949 );
2950
2951 let parse = |row: &str, as_struct: bool, field: Field| {
2952 _parse_structs(
2953 row,
2954 StructMode::ListOnly,
2955 Fields::from(vec![field]),
2956 as_struct,
2957 )
2958 };
2959
2960 assert_eq!(
2962 parse(r#"[["a"]]"#, false, struct_field.clone())
2963 .unwrap_err()
2964 .to_string(),
2965 "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2966 );
2967 assert_eq!(
2968 parse(r#"[["a"]]"#, true, struct_field.clone())
2969 .unwrap_err()
2970 .to_string(),
2971 "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2972 );
2973 assert_eq!(
2974 parse(r#"["a"]"#, true, int_field.clone())
2975 .unwrap_err()
2976 .to_string(),
2977 "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2978 );
2979 assert_eq!(
2980 parse(r#"["a"]"#, false, int_field.clone())
2981 .unwrap_err()
2982 .to_string(),
2983 "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
2984 );
2985 }
2986
2987 #[test]
2988 fn test_type_conflict_nulls() {
2989 let schema = Schema::new(vec![
2990 Field::new("null", DataType::Null, true),
2991 Field::new("bool", DataType::Boolean, true),
2992 Field::new("primitive", DataType::Int32, true),
2993 Field::new("numeric", DataType::Decimal128(10, 3), true),
2994 Field::new("string", DataType::Utf8, true),
2995 Field::new("string_view", DataType::Utf8View, true),
2996 Field::new(
2997 "timestamp",
2998 DataType::Timestamp(TimeUnit::Second, None),
2999 true,
3000 ),
3001 Field::new(
3002 "array",
3003 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
3004 true,
3005 ),
3006 Field::new(
3007 "map",
3008 DataType::Map(
3009 Arc::new(Field::new(
3010 "entries",
3011 DataType::Struct(Fields::from(vec![
3012 Field::new("keys", DataType::Utf8, false),
3013 Field::new("values", DataType::Utf8, true),
3014 ])),
3015 false, )),
3017 false, ),
3019 true, ),
3021 Field::new(
3022 "struct",
3023 DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])),
3024 true,
3025 ),
3026 ]);
3027
3028 let json_values = vec![
3030 json!(null),
3031 json!(true),
3032 json!(42),
3033 json!(1.234),
3034 json!("hi"),
3035 json!("ho"),
3036 json!("1970-01-01T00:00:00+02:00"),
3037 json!([1, "ho", 3]),
3038 json!({"k": "value"}),
3039 json!({"a": 1}),
3040 ];
3041
3042 let json: Vec<_> = (0..json_values.len())
3044 .map(|i| {
3045 let pairs = json_values[i..]
3046 .iter()
3047 .chain(json_values[..i].iter())
3048 .zip(&schema.fields)
3049 .map(|(v, f)| (f.name().to_string(), v.clone()))
3050 .collect();
3051 serde_json::Value::Object(pairs)
3052 })
3053 .collect();
3054 let mut decoder = ReaderBuilder::new(Arc::new(schema))
3055 .with_ignore_type_conflicts(true)
3056 .with_coerce_primitive(true)
3057 .build_decoder()
3058 .unwrap();
3059 decoder.serialize(&json).unwrap();
3060 let batch = decoder.flush().unwrap().unwrap();
3061 assert_eq!(batch.num_rows(), 10);
3062 assert_eq!(batch.num_columns(), 10);
3063
3064 let _ = batch
3066 .column(0)
3067 .as_any()
3068 .downcast_ref::<NullArray>()
3069 .unwrap();
3070
3071 assert!(
3072 batch
3073 .column(1)
3074 .as_any()
3075 .downcast_ref::<BooleanArray>()
3076 .unwrap()
3077 .iter()
3078 .eq([
3079 Some(true),
3080 None,
3081 None,
3082 None,
3083 None,
3084 None,
3085 None,
3086 None,
3087 None,
3088 None
3089 ])
3090 );
3091
3092 assert!(batch.column(2).as_primitive::<Int32Type>().iter().eq([
3093 Some(42),
3094 Some(1),
3095 None,
3096 None,
3097 None,
3098 None,
3099 None,
3100 None,
3101 None,
3102 None
3103 ]));
3104
3105 assert!(batch.column(3).as_primitive::<Decimal128Type>().iter().eq([
3106 Some(1234),
3107 None,
3108 None,
3109 None,
3110 None,
3111 None,
3112 None,
3113 None,
3114 None,
3115 Some(42000)
3116 ]));
3117
3118 assert!(
3119 batch
3120 .column(4)
3121 .as_any()
3122 .downcast_ref::<StringArray>()
3123 .unwrap()
3124 .iter()
3125 .eq([
3126 Some("hi"),
3127 Some("ho"),
3128 Some("1970-01-01T00:00:00+02:00"),
3129 None,
3130 None,
3131 None,
3132 None,
3133 Some("true"),
3134 Some("42"),
3135 Some("1.234"),
3136 ])
3137 );
3138
3139 assert!(
3140 batch
3141 .column(5)
3142 .as_any()
3143 .downcast_ref::<StringViewArray>()
3144 .unwrap()
3145 .iter()
3146 .eq([
3147 Some("ho"),
3148 Some("1970-01-01T00:00:00+02:00"),
3149 None,
3150 None,
3151 None,
3152 None,
3153 Some("true"),
3154 Some("42"),
3155 Some("1.234"),
3156 Some("hi"),
3157 ])
3158 );
3159
3160 assert!(
3161 batch
3162 .column(6)
3163 .as_primitive::<TimestampSecondType>()
3164 .iter()
3165 .eq([
3166 Some(-7200),
3167 None,
3168 None,
3169 None,
3170 None,
3171 None,
3172 Some(42),
3173 None,
3174 None,
3175 None,
3176 ])
3177 );
3178
3179 let arrays = batch
3180 .column(7)
3181 .as_any()
3182 .downcast_ref::<ListArray>()
3183 .unwrap();
3184 assert_eq!(
3185 arrays.nulls(),
3186 Some(&NullBuffer::from(
3187 &[
3188 true, false, false, false, false, false, false, false, false, false
3189 ][..]
3190 ))
3191 );
3192 assert_eq!(arrays.offsets()[1], 3);
3193 let array_values = arrays
3194 .values()
3195 .as_any()
3196 .downcast_ref::<Int32Array>()
3197 .unwrap();
3198 assert!(array_values.iter().eq([Some(1), None, Some(3)]));
3199
3200 let maps = batch.column(8).as_any().downcast_ref::<MapArray>().unwrap();
3201 assert_eq!(
3202 maps.nulls(),
3203 Some(&NullBuffer::from(
3204 &[
3206 true, true, false, false, false, false, false, false, false, false
3207 ][..]
3208 ))
3209 );
3210 let map_keys = maps.keys().as_any().downcast_ref::<StringArray>().unwrap();
3211 assert!(map_keys.iter().eq([Some("k"), Some("a")]));
3212 let map_values = maps
3213 .values()
3214 .as_any()
3215 .downcast_ref::<StringArray>()
3216 .unwrap();
3217 assert!(map_values.iter().eq([Some("value"), Some("1")]));
3218
3219 let structs = batch
3220 .column(9)
3221 .as_any()
3222 .downcast_ref::<StructArray>()
3223 .unwrap();
3224 assert_eq!(
3225 structs.nulls(),
3226 Some(&NullBuffer::from(
3227 &[
3229 true, false, false, false, false, false, false, false, false, true
3230 ][..]
3231 ))
3232 );
3233 let struct_fields = structs
3234 .column(0)
3235 .as_any()
3236 .downcast_ref::<Int32Array>()
3237 .unwrap();
3238 assert!(struct_fields.slice(0, 2).iter().eq([Some(1), None]));
3239 }
3240
3241 #[test]
3242 fn test_type_conflict_non_nullable() {
3243 let fields = [
3244 Field::new("bool", DataType::Boolean, false),
3245 Field::new("primitive", DataType::Int32, false),
3246 Field::new("numeric", DataType::Decimal128(10, 3), false),
3247 Field::new("string", DataType::Utf8, false),
3248 Field::new("string_view", DataType::Utf8View, false),
3249 Field::new(
3250 "timestamp",
3251 DataType::Timestamp(TimeUnit::Second, None),
3252 false,
3253 ),
3254 Field::new(
3255 "array",
3256 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
3257 false,
3258 ),
3259 Field::new(
3260 "map",
3261 DataType::Map(
3262 Arc::new(Field::new(
3263 "entries",
3264 DataType::Struct(Fields::from(vec![
3265 Field::new("keys", DataType::Utf8, false),
3266 Field::new("values", DataType::Utf8, true),
3267 ])),
3268 false, )),
3270 false, ),
3272 false, ),
3274 Field::new(
3275 "struct",
3276 DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])),
3277 false,
3278 ),
3279 ];
3280
3281 let json_values = vec![json!(true), json!({"a": 1})];
3283
3284 for field in fields {
3285 let mut decoder = ReaderBuilder::new_with_field(field)
3286 .with_ignore_type_conflicts(true)
3287 .build_decoder()
3288 .unwrap();
3289 decoder.serialize(&json_values).unwrap();
3290 decoder
3291 .flush()
3292 .expect_err("type conflict on non-nullable type");
3293 }
3294 }
3295
3296 #[test]
3297 fn test_ignore_type_conflicts_disabled() {
3298 let fields = [
3299 Field::new("null", DataType::Null, true),
3300 Field::new("bool", DataType::Boolean, true),
3301 Field::new("primitive", DataType::Int32, true),
3302 Field::new("numeric", DataType::Decimal128(10, 3), true),
3303 Field::new("string", DataType::Utf8, true),
3304 Field::new("string_view", DataType::Utf8View, true),
3305 Field::new(
3306 "timestamp",
3307 DataType::Timestamp(TimeUnit::Second, None),
3308 true,
3309 ),
3310 Field::new(
3311 "array",
3312 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
3313 true,
3314 ),
3315 Field::new(
3316 "map",
3317 DataType::Map(
3318 Arc::new(Field::new(
3319 "entries",
3320 DataType::Struct(Fields::from(vec![
3321 Field::new("keys", DataType::Utf8, false),
3322 Field::new("values", DataType::Utf8, true),
3323 ])),
3324 false, )),
3326 false, ),
3328 true, ),
3330 Field::new(
3331 "struct",
3332 DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])),
3333 true,
3334 ),
3335 ];
3336
3337 let json_values = vec![json!(true), json!({"a": 1})];
3339
3340 for field in fields {
3341 let mut decoder = ReaderBuilder::new_with_field(field)
3342 .build_decoder()
3343 .unwrap();
3344 decoder.serialize(&json_values).unwrap();
3345 decoder
3346 .flush()
3347 .expect_err("type conflict on non-nullable type");
3348 }
3349 }
3350
3351 #[test]
3352 fn test_read_run_end_encoded() {
3353 let buf = r#"
3354 {"a": "x"}
3355 {"a": "x"}
3356 {"a": "y"}
3357 {"a": "y"}
3358 {"a": "y"}
3359 "#;
3360
3361 let ree_type = DataType::RunEndEncoded(
3362 Arc::new(Field::new("run_ends", DataType::Int32, false)),
3363 Arc::new(Field::new("values", DataType::Utf8, true)),
3364 );
3365 let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
3366 let batches = do_read(buf, 1024, false, false, schema);
3367 assert_eq!(batches.len(), 1);
3368
3369 let col = batches[0].column(0);
3370 let run_array = col.as_run::<arrow_array::types::Int32Type>();
3371
3372 assert_eq!(run_array.len(), 5);
3374 assert_eq!(run_array.run_ends().values(), &[2, 5]);
3375
3376 let values = run_array.values().as_string::<i32>();
3377 assert_eq!(values.len(), 2);
3378 assert_eq!(values.value(0), "x");
3379 assert_eq!(values.value(1), "y");
3380 }
3381
3382 #[test]
3383 fn test_read_run_end_encoded_consecutive_nulls() {
3384 let buf = r#"
3385 {"a": "x"}
3386 {}
3387 {}
3388 {}
3389 {"a": "y"}
3390 "#;
3391
3392 let ree_type = DataType::RunEndEncoded(
3393 Arc::new(Field::new("run_ends", DataType::Int32, false)),
3394 Arc::new(Field::new("values", DataType::Utf8, true)),
3395 );
3396 let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
3397 let batches = do_read(buf, 1024, false, false, schema);
3398 assert_eq!(batches.len(), 1);
3399
3400 let col = batches[0].column(0);
3401 let run_array = col.as_run::<arrow_array::types::Int32Type>();
3402
3403 assert_eq!(run_array.len(), 5);
3405 assert_eq!(run_array.run_ends().values(), &[1, 4, 5]);
3406
3407 let values = run_array.values().as_string::<i32>();
3408 assert_eq!(values.len(), 3);
3409 assert_eq!(values.value(0), "x");
3410 assert!(values.is_null(1));
3411 assert_eq!(values.value(2), "y");
3412 }
3413
3414 #[test]
3415 fn test_read_run_end_encoded_all_unique() {
3416 let buf = r#"
3417 {"a": 1}
3418 {"a": 2}
3419 {"a": 3}
3420 "#;
3421
3422 let ree_type = DataType::RunEndEncoded(
3423 Arc::new(Field::new("run_ends", DataType::Int32, false)),
3424 Arc::new(Field::new("values", DataType::Int32, true)),
3425 );
3426 let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
3427 let batches = do_read(buf, 1024, false, false, schema);
3428 assert_eq!(batches.len(), 1);
3429
3430 let col = batches[0].column(0);
3431 let run_array = col.as_run::<arrow_array::types::Int32Type>();
3432
3433 assert_eq!(run_array.len(), 3);
3435 assert_eq!(run_array.run_ends().values(), &[1, 2, 3]);
3436 }
3437
3438 #[test]
3439 fn test_read_run_end_encoded_int16_run_ends() {
3440 let buf = r#"
3441 {"a": "x"}
3442 {"a": "x"}
3443 {"a": "y"}
3444 "#;
3445
3446 let ree_type = DataType::RunEndEncoded(
3447 Arc::new(Field::new("run_ends", DataType::Int16, false)),
3448 Arc::new(Field::new("values", DataType::Utf8, true)),
3449 );
3450 let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
3451 let batches = do_read(buf, 1024, false, false, schema);
3452 assert_eq!(batches.len(), 1);
3453
3454 let col = batches[0].column(0);
3455 let run_array = col.as_run::<arrow_array::types::Int16Type>();
3456
3457 assert_eq!(run_array.len(), 3);
3458 assert_eq!(run_array.run_ends().values(), &[2i16, 3]);
3459 }
3460}