1use std::borrow::Cow;
137use std::io::BufRead;
138use std::sync::Arc;
139
140use arrow_array::cast::AsArray;
141use arrow_array::timezone::Tz;
142use arrow_array::types::*;
143use arrow_array::{ArrayRef, RecordBatch, RecordBatchReader, downcast_integer};
144use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef, TimeUnit};
145use chrono::Utc;
146use serde_core::Serialize;
147
148use crate::StructMode;
149use crate::reader::binary_array::{
150 BinaryArrayDecoder, BinaryViewDecoder, FixedSizeBinaryArrayDecoder,
151};
152use crate::reader::boolean_array::BooleanArrayDecoder;
153use crate::reader::decimal_array::DecimalArrayDecoder;
154use crate::reader::list_array::{
155 FixedSizeListArrayDecoder, ListArrayDecoder, ListViewArrayDecoder,
156};
157use crate::reader::map_array::MapArrayDecoder;
158use crate::reader::null_array::NullArrayDecoder;
159use crate::reader::primitive_array::PrimitiveArrayDecoder;
160use crate::reader::run_end_array::RunEndEncodedArrayDecoder;
161use crate::reader::string_array::StringArrayDecoder;
162use crate::reader::string_view_array::StringViewArrayDecoder;
163use crate::reader::struct_array::StructArrayDecoder;
164use crate::reader::tape::{Tape, TapeDecoder};
165use crate::reader::timestamp_array::TimestampArrayDecoder;
166
167pub use schema::*;
168pub use value_iter::ValueIter;
169
170mod binary_array;
171mod boolean_array;
172mod decimal_array;
173mod list_array;
174mod map_array;
175mod null_array;
176mod primitive_array;
177mod run_end_array;
178mod schema;
179mod serializer;
180mod string_array;
181mod string_view_array;
182mod struct_array;
183mod tape;
184mod timestamp_array;
185mod value_iter;
186
187pub struct ReaderBuilder {
189 batch_size: usize,
190 coerce_primitive: bool,
191 strict_mode: bool,
192 ignore_type_conflicts: bool,
193 is_field: bool,
194 struct_mode: StructMode,
195
196 schema: SchemaRef,
197}
198
199impl ReaderBuilder {
200 pub fn new(schema: SchemaRef) -> Self {
209 Self {
210 batch_size: 1024,
211 coerce_primitive: false,
212 strict_mode: false,
213 ignore_type_conflicts: false,
214 is_field: false,
215 struct_mode: Default::default(),
216 schema,
217 }
218 }
219
220 pub fn new_with_field(field: impl Into<FieldRef>) -> Self {
251 Self {
252 batch_size: 1024,
253 coerce_primitive: false,
254 strict_mode: false,
255 ignore_type_conflicts: false,
256 is_field: true,
257 struct_mode: Default::default(),
258 schema: Arc::new(Schema::new([field.into()])),
259 }
260 }
261
262 pub fn with_batch_size(self, batch_size: usize) -> Self {
264 Self { batch_size, ..self }
265 }
266
267 pub fn with_coerce_primitive(self, coerce_primitive: bool) -> Self {
270 Self {
271 coerce_primitive,
272 ..self
273 }
274 }
275
276 pub fn with_strict_mode(self, strict_mode: bool) -> Self {
282 Self {
283 strict_mode,
284 ..self
285 }
286 }
287
288 pub fn with_struct_mode(self, struct_mode: StructMode) -> Self {
292 Self {
293 struct_mode,
294 ..self
295 }
296 }
297
298 pub fn with_ignore_type_conflicts(self, ignore_type_conflicts: bool) -> Self {
311 Self {
312 ignore_type_conflicts,
313 ..self
314 }
315 }
316
317 pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError> {
319 Ok(Reader {
320 reader,
321 decoder: self.build_decoder()?,
322 })
323 }
324
325 pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
327 let (data_type, nullable) = if self.is_field {
328 let field = &self.schema.fields[0];
329 let data_type = Cow::Borrowed(field.data_type());
330 (data_type, field.is_nullable())
331 } else {
332 let data_type = Cow::Owned(DataType::Struct(self.schema.fields.clone()));
333 (data_type, false)
334 };
335
336 let ctx = DecoderContext {
337 coerce_primitive: self.coerce_primitive,
338 strict_mode: self.strict_mode,
339 struct_mode: self.struct_mode,
340 ignore_type_conflicts: self.ignore_type_conflicts,
341 };
342 let decoder = ctx.make_decoder(data_type.as_ref(), nullable)?;
343
344 let num_fields = self.schema.flattened_fields().len();
345
346 Ok(Decoder {
347 decoder,
348 is_field: self.is_field,
349 tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
350 batch_size: self.batch_size,
351 schema: self.schema,
352 })
353 }
354}
355
356pub struct Reader<R> {
360 reader: R,
361 decoder: Decoder,
362}
363
364impl<R> std::fmt::Debug for Reader<R> {
365 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
366 f.debug_struct("Reader")
367 .field("decoder", &self.decoder)
368 .finish()
369 }
370}
371
372impl<R: BufRead> Reader<R> {
373 fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
375 loop {
376 let buf = self.reader.fill_buf()?;
377 if buf.is_empty() {
378 break;
379 }
380 let read = buf.len();
381
382 let decoded = self.decoder.decode(buf)?;
383 self.reader.consume(decoded);
384 if decoded != read {
385 break;
386 }
387 }
388 self.decoder.flush()
389 }
390}
391
392impl<R: BufRead> Iterator for Reader<R> {
393 type Item = Result<RecordBatch, ArrowError>;
394
395 fn next(&mut self) -> Option<Self::Item> {
396 self.read().transpose()
397 }
398}
399
400impl<R: BufRead> RecordBatchReader for Reader<R> {
401 fn schema(&self) -> SchemaRef {
402 self.decoder.schema.clone()
403 }
404}
405
406pub struct Decoder {
447 tape_decoder: TapeDecoder,
448 decoder: Box<dyn ArrayDecoder>,
449 batch_size: usize,
450 is_field: bool,
451 schema: SchemaRef,
452}
453
454impl std::fmt::Debug for Decoder {
455 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
456 f.debug_struct("Decoder")
457 .field("schema", &self.schema)
458 .field("batch_size", &self.batch_size)
459 .finish()
460 }
461}
462
463impl Decoder {
464 pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError> {
473 self.tape_decoder.decode(buf)
474 }
475
476 pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
653 self.tape_decoder.serialize(rows)
654 }
655
656 pub fn has_partial_record(&self) -> bool {
658 self.tape_decoder.has_partial_row()
659 }
660
661 pub fn len(&self) -> usize {
663 self.tape_decoder.num_buffered_rows()
664 }
665
666 pub fn is_empty(&self) -> bool {
668 self.len() == 0
669 }
670
671 pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
678 let tape = self.tape_decoder.finish()?;
679
680 if tape.num_rows() == 0 {
681 return Ok(None);
682 }
683
684 let mut next_object = 1;
686 let pos: Vec<_> = (0..tape.num_rows())
687 .map(|_| {
688 let next = tape.next(next_object, "row").unwrap();
689 std::mem::replace(&mut next_object, next)
690 })
691 .collect();
692
693 let decoded = self.decoder.decode(&tape, &pos)?;
694 self.tape_decoder.clear();
695
696 let batch = match self.is_field {
697 true => RecordBatch::try_new(self.schema.clone(), vec![decoded])?,
698 false => {
699 RecordBatch::from(decoded.as_struct().clone()).with_schema(self.schema.clone())?
700 }
701 };
702
703 Ok(Some(batch))
704 }
705}
706
707trait ArrayDecoder: Send {
708 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError>;
710}
711
712pub struct DecoderContext {
717 coerce_primitive: bool,
719 strict_mode: bool,
721 struct_mode: StructMode,
723 ignore_type_conflicts: bool,
725}
726
727impl DecoderContext {
728 pub fn coerce_primitive(&self) -> bool {
730 self.coerce_primitive
731 }
732
733 pub fn strict_mode(&self) -> bool {
735 self.strict_mode
736 }
737
738 pub fn struct_mode(&self) -> StructMode {
740 self.struct_mode
741 }
742
743 pub fn ignore_type_conflicts(&self) -> bool {
745 self.ignore_type_conflicts
746 }
747
748 fn make_decoder(
753 &self,
754 data_type: &DataType,
755 is_nullable: bool,
756 ) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
757 make_decoder(self, data_type, is_nullable)
758 }
759}
760
761fn make_decoder(
762 ctx: &DecoderContext,
763 data_type: &DataType,
764 is_nullable: bool,
765) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
766 macro_rules! primitive_decoder {
767 ($t:ty, $data_type:expr) => {
768 Ok(Box::new(PrimitiveArrayDecoder::<$t>::new(ctx, $data_type)))
769 };
770 }
771 macro_rules! timestamp_decoder {
772 ($t:ty, $data_type:expr, $tz:expr) => {{
773 Ok(Box::new(TimestampArrayDecoder::<$t, _>::new(
774 ctx, $data_type, $tz,
775 )))
776 }};
777 }
778 macro_rules! decimal_decoder {
779 ($t:ty, $p:expr, $s:expr) => {
780 Ok(Box::new(DecimalArrayDecoder::<$t>::new(ctx, $p, $s)))
781 };
782 }
783
784 downcast_integer! {
785 *data_type => (primitive_decoder, data_type),
786 DataType::Null => Ok(Box::new(NullArrayDecoder::new(ctx))),
787 DataType::Float16 => primitive_decoder!(Float16Type, data_type),
788 DataType::Float32 => primitive_decoder!(Float32Type, data_type),
789 DataType::Float64 => primitive_decoder!(Float64Type, data_type),
790 DataType::Timestamp(TimeUnit::Second, None) => {
791 timestamp_decoder!(TimestampSecondType, data_type, Utc)
792 },
793 DataType::Timestamp(TimeUnit::Millisecond, None) => {
794 timestamp_decoder!(TimestampMillisecondType, data_type, Utc)
795 },
796 DataType::Timestamp(TimeUnit::Microsecond, None) => {
797 timestamp_decoder!(TimestampMicrosecondType, data_type, Utc)
798 },
799 DataType::Timestamp(TimeUnit::Nanosecond, None) => {
800 timestamp_decoder!(TimestampNanosecondType, data_type, Utc)
801 },
802 DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
803 let tz: Tz = tz.parse()?;
804 timestamp_decoder!(TimestampSecondType, data_type, tz)
805 },
806 DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
807 let tz: Tz = tz.parse()?;
808 timestamp_decoder!(TimestampMillisecondType, data_type, tz)
809 },
810 DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
811 let tz: Tz = tz.parse()?;
812 timestamp_decoder!(TimestampMicrosecondType, data_type, tz)
813 },
814 DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
815 let tz: Tz = tz.parse()?;
816 timestamp_decoder!(TimestampNanosecondType, data_type, tz)
817 },
818 DataType::Date32 => primitive_decoder!(Date32Type, data_type),
819 DataType::Date64 => primitive_decoder!(Date64Type, data_type),
820 DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
821 DataType::Time32(TimeUnit::Millisecond) => primitive_decoder!(Time32MillisecondType, data_type),
822 DataType::Time64(TimeUnit::Microsecond) => primitive_decoder!(Time64MicrosecondType, data_type),
823 DataType::Time64(TimeUnit::Nanosecond) => primitive_decoder!(Time64NanosecondType, data_type),
824 DataType::Duration(TimeUnit::Nanosecond) => primitive_decoder!(DurationNanosecondType, data_type),
825 DataType::Duration(TimeUnit::Microsecond) => primitive_decoder!(DurationMicrosecondType, data_type),
826 DataType::Duration(TimeUnit::Millisecond) => primitive_decoder!(DurationMillisecondType, data_type),
827 DataType::Duration(TimeUnit::Second) => primitive_decoder!(DurationSecondType, data_type),
828 DataType::Decimal32(p, s) => decimal_decoder!(Decimal32Type, p, s),
829 DataType::Decimal64(p, s) => decimal_decoder!(Decimal64Type, p, s),
830 DataType::Decimal128(p, s) => decimal_decoder!(Decimal128Type, p, s),
831 DataType::Decimal256(p, s) => decimal_decoder!(Decimal256Type, p, s),
832 DataType::Boolean => Ok(Box::new(BooleanArrayDecoder::new(ctx))),
833 DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(ctx))),
834 DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(ctx))),
835 DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(ctx))),
836 DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
837 DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
838 DataType::ListView(_) => Ok(Box::new(ListViewArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
839 DataType::LargeListView(_) => Ok(Box::new(ListViewArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
840 DataType::FixedSizeList(_, _) => Ok(Box::new(FixedSizeListArrayDecoder::new(ctx, data_type, is_nullable)?)),
841 DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(ctx, data_type, is_nullable)?)),
842 DataType::Binary => Ok(Box::new(BinaryArrayDecoder::<i32>::default())),
843 DataType::LargeBinary => Ok(Box::new(BinaryArrayDecoder::<i64>::default())),
844 DataType::FixedSizeBinary(len) => Ok(Box::new(FixedSizeBinaryArrayDecoder::new(len))),
845 DataType::BinaryView => Ok(Box::new(BinaryViewDecoder::default())),
846 DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(ctx, data_type, is_nullable)?)),
847 DataType::RunEndEncoded(ref r, _) => match r.data_type() {
848 DataType::Int16 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int16Type>::new(ctx, data_type, is_nullable)?)),
849 DataType::Int32 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int32Type>::new(ctx, data_type, is_nullable)?)),
850 DataType::Int64 => Ok(Box::new(RunEndEncodedArrayDecoder::<Int64Type>::new(ctx, data_type, is_nullable)?)),
851 d => unreachable!("unsupported run end index type: {d}"),
852 },
853 _ => Err(ArrowError::NotYetImplemented(format!("Support for {data_type} in JSON reader")))
854 }
855}
856
857#[cfg(test)]
858mod tests {
859 use arrow_array::cast::AsArray;
860 use arrow_array::{
861 Array, BooleanArray, Float64Array, GenericListViewArray, Int32Array, ListArray, MapArray,
862 NullArray, OffsetSizeTrait, StringArray, StringViewArray, StructArray,
863 };
864 use arrow_buffer::{ArrowNativeType, NullBuffer, OffsetBuffer, ScalarBuffer};
865 use arrow_cast::display::{ArrayFormatter, FormatOptions};
866 use arrow_schema::{Field, Fields};
867 use serde_json::json;
868 use std::fs::File;
869 use std::io::{BufReader, Cursor, Seek};
870
871 use super::*;
872
873 fn do_read(
874 buf: &str,
875 batch_size: usize,
876 coerce_primitive: bool,
877 strict_mode: bool,
878 schema: SchemaRef,
879 ) -> Vec<RecordBatch> {
880 let mut unbuffered = vec![];
881
882 for batch_size in [1, 3, 100, batch_size] {
884 unbuffered = ReaderBuilder::new(schema.clone())
885 .with_batch_size(batch_size)
886 .with_coerce_primitive(coerce_primitive)
887 .build(Cursor::new(buf.as_bytes()))
888 .unwrap()
889 .collect::<Result<Vec<_>, _>>()
890 .unwrap();
891
892 for b in unbuffered.iter().take(unbuffered.len() - 1) {
893 assert_eq!(b.num_rows(), batch_size)
894 }
895
896 for b in [1, 3, 5] {
898 let buffered = ReaderBuilder::new(schema.clone())
899 .with_batch_size(batch_size)
900 .with_coerce_primitive(coerce_primitive)
901 .with_strict_mode(strict_mode)
902 .build(BufReader::with_capacity(b, Cursor::new(buf.as_bytes())))
903 .unwrap()
904 .collect::<Result<Vec<_>, _>>()
905 .unwrap();
906 assert_eq!(unbuffered, buffered);
907 }
908 }
909
910 unbuffered
911 }
912
913 #[test]
914 fn test_basic() {
915 let buf = r#"
916 {"a": 1, "b": 2, "c": true, "d": 1}
917 {"a": 2E0, "b": 4, "c": false, "d": 2, "e": 254}
918
919 {"b": 6, "a": 2.0, "d": 45}
920 {"b": "5", "a": 2}
921 {"b": 4e0}
922 {"b": 7, "a": null}
923 "#;
924
925 let schema = Arc::new(Schema::new(vec![
926 Field::new("a", DataType::Int64, true),
927 Field::new("b", DataType::Int32, true),
928 Field::new("c", DataType::Boolean, true),
929 Field::new("d", DataType::Date32, true),
930 Field::new("e", DataType::Date64, true),
931 ]));
932
933 let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
934 assert!(decoder.is_empty());
935 assert_eq!(decoder.len(), 0);
936 assert!(!decoder.has_partial_record());
937 assert_eq!(decoder.decode(buf.as_bytes()).unwrap(), 221);
938 assert!(!decoder.is_empty());
939 assert_eq!(decoder.len(), 6);
940 assert!(!decoder.has_partial_record());
941 let batch = decoder.flush().unwrap().unwrap();
942 assert_eq!(batch.num_rows(), 6);
943 assert!(decoder.is_empty());
944 assert_eq!(decoder.len(), 0);
945 assert!(!decoder.has_partial_record());
946
947 let batches = do_read(buf, 1024, false, false, schema);
948 assert_eq!(batches.len(), 1);
949
950 let col1 = batches[0].column(0).as_primitive::<Int64Type>();
951 assert_eq!(col1.null_count(), 2);
952 assert_eq!(col1.values(), &[1, 2, 2, 2, 0, 0]);
953 assert!(col1.is_null(4));
954 assert!(col1.is_null(5));
955
956 let col2 = batches[0].column(1).as_primitive::<Int32Type>();
957 assert_eq!(col2.null_count(), 0);
958 assert_eq!(col2.values(), &[2, 4, 6, 5, 4, 7]);
959
960 let col3 = batches[0].column(2).as_boolean();
961 assert_eq!(col3.null_count(), 4);
962 assert!(col3.value(0));
963 assert!(!col3.is_null(0));
964 assert!(!col3.value(1));
965 assert!(!col3.is_null(1));
966
967 let col4 = batches[0].column(3).as_primitive::<Date32Type>();
968 assert_eq!(col4.null_count(), 3);
969 assert!(col4.is_null(3));
970 assert_eq!(col4.values(), &[1, 2, 45, 0, 0, 0]);
971
972 let col5 = batches[0].column(4).as_primitive::<Date64Type>();
973 assert_eq!(col5.null_count(), 5);
974 assert!(col5.is_null(0));
975 assert!(col5.is_null(2));
976 assert!(col5.is_null(3));
977 assert_eq!(col5.values(), &[0, 254, 0, 0, 0, 0]);
978 }
979
980 #[test]
981 fn test_string() {
982 let buf = r#"
983 {"a": "1", "b": "2"}
984 {"a": "hello", "b": "shoo"}
985 {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
986
987 {"b": null}
988 {"b": "", "a": null}
989
990 "#;
991 let schema = Arc::new(Schema::new(vec![
992 Field::new("a", DataType::Utf8, true),
993 Field::new("b", DataType::LargeUtf8, true),
994 ]));
995
996 let batches = do_read(buf, 1024, false, false, schema);
997 assert_eq!(batches.len(), 1);
998
999 let col1 = batches[0].column(0).as_string::<i32>();
1000 assert_eq!(col1.null_count(), 2);
1001 assert_eq!(col1.value(0), "1");
1002 assert_eq!(col1.value(1), "hello");
1003 assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1004 assert!(col1.is_null(3));
1005 assert!(col1.is_null(4));
1006
1007 let col2 = batches[0].column(1).as_string::<i64>();
1008 assert_eq!(col2.null_count(), 1);
1009 assert_eq!(col2.value(0), "2");
1010 assert_eq!(col2.value(1), "shoo");
1011 assert_eq!(col2.value(2), "\t😁foo");
1012 assert!(col2.is_null(3));
1013 assert_eq!(col2.value(4), "");
1014 }
1015
1016 #[test]
1017 fn test_long_string_view_allocation() {
1018 let expected_capacity: usize = 41;
1028
1029 let buf = r#"
1030 {"a": "short", "b": "dummy"}
1031 {"a": "this is definitely long", "b": "dummy"}
1032 {"a": "hello", "b": "dummy"}
1033 {"a": "\nfoobar😀asfgÿ", "b": "dummy"}
1034 "#;
1035
1036 let schema = Arc::new(Schema::new(vec![
1037 Field::new("a", DataType::Utf8View, true),
1038 Field::new("b", DataType::LargeUtf8, true),
1039 ]));
1040
1041 let batches = do_read(buf, 1024, false, false, schema);
1042 assert_eq!(batches.len(), 1, "Expected one record batch");
1043
1044 let col_a = batches[0].column(0);
1046 let string_view_array = col_a
1047 .as_any()
1048 .downcast_ref::<StringViewArray>()
1049 .expect("Column should be a StringViewArray");
1050
1051 let data_buffer = string_view_array.to_data().buffers()[0].len();
1054
1055 assert!(
1058 data_buffer >= expected_capacity,
1059 "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1060 );
1061
1062 assert_eq!(string_view_array.value(0), "short");
1064 assert_eq!(string_view_array.value(1), "this is definitely long");
1065 assert_eq!(string_view_array.value(2), "hello");
1066 assert_eq!(string_view_array.value(3), "\nfoobar😀asfgÿ");
1067 }
1068
1069 #[test]
1071 fn test_numeric_view_allocation() {
1072 let expected_capacity: usize = 33;
1080
1081 let buf = r#"
1082 {"n": 123456789}
1083 {"n": 1000000000000}
1084 {"n": 3.1415}
1085 {"n": 2.718281828459045}
1086 "#;
1087
1088 let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Utf8View, true)]));
1089
1090 let batches = do_read(buf, 1024, true, false, schema);
1091 assert_eq!(batches.len(), 1, "Expected one record batch");
1092
1093 let col_n = batches[0].column(0);
1094 let string_view_array = col_n
1095 .as_any()
1096 .downcast_ref::<StringViewArray>()
1097 .expect("Column should be a StringViewArray");
1098
1099 let data_buffer = string_view_array.to_data().buffers()[0].len();
1101 assert!(
1102 data_buffer >= expected_capacity,
1103 "Data buffer length ({data_buffer}) should be at least {expected_capacity}",
1104 );
1105
1106 assert_eq!(string_view_array.value(0), "123456789");
1109 assert_eq!(string_view_array.value(1), "1000000000000");
1110 assert_eq!(string_view_array.value(2), "3.1415");
1111 assert_eq!(string_view_array.value(3), "2.718281828459045");
1112 }
1113
1114 #[test]
1115 fn test_string_with_uft8view() {
1116 let buf = r#"
1117 {"a": "1", "b": "2"}
1118 {"a": "hello", "b": "shoo"}
1119 {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
1120
1121 {"b": null}
1122 {"b": "", "a": null}
1123
1124 "#;
1125 let schema = Arc::new(Schema::new(vec![
1126 Field::new("a", DataType::Utf8View, true),
1127 Field::new("b", DataType::LargeUtf8, true),
1128 ]));
1129
1130 let batches = do_read(buf, 1024, false, false, schema);
1131 assert_eq!(batches.len(), 1);
1132
1133 let col1 = batches[0].column(0).as_string_view();
1134 assert_eq!(col1.null_count(), 2);
1135 assert_eq!(col1.value(0), "1");
1136 assert_eq!(col1.value(1), "hello");
1137 assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1138 assert!(col1.is_null(3));
1139 assert!(col1.is_null(4));
1140 assert_eq!(col1.data_type(), &DataType::Utf8View);
1141
1142 let col2 = batches[0].column(1).as_string::<i64>();
1143 assert_eq!(col2.null_count(), 1);
1144 assert_eq!(col2.value(0), "2");
1145 assert_eq!(col2.value(1), "shoo");
1146 assert_eq!(col2.value(2), "\t😁foo");
1147 assert!(col2.is_null(3));
1148 assert_eq!(col2.value(4), "");
1149 }
1150
1151 #[test]
1152 fn test_complex() {
1153 let buf = r#"
1154 {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3}, {"c": 4}]}}
1155 {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1156 {"list": null, "nested": {"a": null}}
1157 "#;
1158
1159 let schema = Arc::new(Schema::new(vec![
1160 Field::new_list("list", Field::new("element", DataType::Int32, false), true),
1161 Field::new_struct(
1162 "nested",
1163 vec![
1164 Field::new("a", DataType::Int32, true),
1165 Field::new("b", DataType::Int32, true),
1166 ],
1167 true,
1168 ),
1169 Field::new_struct(
1170 "nested_list",
1171 vec![Field::new_list(
1172 "list2",
1173 Field::new_struct(
1174 "element",
1175 vec![Field::new("c", DataType::Int32, false)],
1176 false,
1177 ),
1178 true,
1179 )],
1180 true,
1181 ),
1182 ]));
1183
1184 let batches = do_read(buf, 1024, false, false, schema);
1185 assert_eq!(batches.len(), 1);
1186
1187 let list = batches[0].column(0).as_list::<i32>();
1188 assert_eq!(list.len(), 3);
1189 assert_eq!(list.value_offsets(), &[0, 0, 2, 2]);
1190 assert_eq!(list.null_count(), 1);
1191 assert!(list.is_null(2));
1192 let list_values = list.values().as_primitive::<Int32Type>();
1193 assert_eq!(list_values.values(), &[5, 6]);
1194
1195 let nested = batches[0].column(1).as_struct();
1196 let a = nested.column(0).as_primitive::<Int32Type>();
1197 assert_eq!(list.null_count(), 1);
1198 assert_eq!(a.values(), &[1, 7, 0]);
1199 assert!(list.is_null(2));
1200
1201 let b = nested.column(1).as_primitive::<Int32Type>();
1202 assert_eq!(b.null_count(), 2);
1203 assert_eq!(b.len(), 3);
1204 assert_eq!(b.value(0), 2);
1205 assert!(b.is_null(1));
1206 assert!(b.is_null(2));
1207
1208 let nested_list = batches[0].column(2).as_struct();
1209 assert_eq!(nested_list.len(), 3);
1210 assert_eq!(nested_list.null_count(), 1);
1211 assert!(nested_list.is_null(2));
1212
1213 let list2 = nested_list.column(0).as_list::<i32>();
1214 assert_eq!(list2.len(), 3);
1215 assert_eq!(list2.null_count(), 1);
1216 assert_eq!(list2.value_offsets(), &[0, 2, 2, 2]);
1217 assert!(list2.is_null(2));
1218
1219 let list2_values = list2.values().as_struct();
1220
1221 let c = list2_values.column(0).as_primitive::<Int32Type>();
1222 assert_eq!(c.values(), &[3, 4]);
1223 }
1224
1225 #[test]
1226 fn test_projection() {
1227 let buf = r#"
1228 {"list": [], "nested": {"a": 1, "b": 2}, "nested_list": {"list2": [{"c": 3, "d": 5}, {"c": 4}]}}
1229 {"list": [5, 6], "nested": {"a": 7}, "nested_list": {"list2": []}}
1230 "#;
1231
1232 let schema = Arc::new(Schema::new(vec![
1233 Field::new_struct(
1234 "nested",
1235 vec![Field::new("a", DataType::Int32, false)],
1236 true,
1237 ),
1238 Field::new_struct(
1239 "nested_list",
1240 vec![Field::new_list(
1241 "list2",
1242 Field::new_struct(
1243 "element",
1244 vec![Field::new("d", DataType::Int32, true)],
1245 false,
1246 ),
1247 true,
1248 )],
1249 true,
1250 ),
1251 ]));
1252
1253 let batches = do_read(buf, 1024, false, false, schema);
1254 assert_eq!(batches.len(), 1);
1255
1256 let nested = batches[0].column(0).as_struct();
1257 assert_eq!(nested.num_columns(), 1);
1258 let a = nested.column(0).as_primitive::<Int32Type>();
1259 assert_eq!(a.null_count(), 0);
1260 assert_eq!(a.values(), &[1, 7]);
1261
1262 let nested_list = batches[0].column(1).as_struct();
1263 assert_eq!(nested_list.num_columns(), 1);
1264 assert_eq!(nested_list.null_count(), 0);
1265
1266 let list2 = nested_list.column(0).as_list::<i32>();
1267 assert_eq!(list2.value_offsets(), &[0, 2, 2]);
1268 assert_eq!(list2.null_count(), 0);
1269
1270 let child = list2.values().as_struct();
1271 assert_eq!(child.num_columns(), 1);
1272 assert_eq!(child.len(), 2);
1273 assert_eq!(child.null_count(), 0);
1274
1275 let c = child.column(0).as_primitive::<Int32Type>();
1276 assert_eq!(c.values(), &[5, 0]);
1277 assert_eq!(c.null_count(), 1);
1278 assert!(c.is_null(1));
1279 }
1280
1281 #[test]
1282 fn test_map() {
1283 let buf = r#"
1284 {"map": {"a": ["foo", null]}}
1285 {"map": {"a": [null], "b": []}}
1286 {"map": {"c": null, "a": ["baz"]}}
1287 "#;
1288 let map = Field::new_map(
1289 "map",
1290 "entries",
1291 Field::new("key", DataType::Utf8, false),
1292 Field::new_list("value", Field::new("element", DataType::Utf8, true), true),
1293 false,
1294 true,
1295 );
1296
1297 let schema = Arc::new(Schema::new(vec![map]));
1298
1299 let batches = do_read(buf, 1024, false, false, schema);
1300 assert_eq!(batches.len(), 1);
1301
1302 let map = batches[0].column(0).as_map();
1303 let map_keys = map.keys().as_string::<i32>();
1304 let map_values = map.values().as_list::<i32>();
1305 assert_eq!(map.value_offsets(), &[0, 1, 3, 5]);
1306
1307 let k: Vec<_> = map_keys.iter().flatten().collect();
1308 assert_eq!(&k, &["a", "a", "b", "c", "a"]);
1309
1310 let list_values = map_values.values().as_string::<i32>();
1311 let lv: Vec<_> = list_values.iter().collect();
1312 assert_eq!(&lv, &[Some("foo"), None, None, Some("baz")]);
1313 assert_eq!(map_values.value_offsets(), &[0, 2, 3, 3, 3, 4]);
1314 assert_eq!(map_values.null_count(), 1);
1315 assert!(map_values.is_null(3));
1316
1317 let options = FormatOptions::default().with_null("null");
1318 let formatter = ArrayFormatter::try_new(map, &options).unwrap();
1319 assert_eq!(formatter.value(0).to_string(), "{a: [foo, null]}");
1320 assert_eq!(formatter.value(1).to_string(), "{a: [null], b: []}");
1321 assert_eq!(formatter.value(2).to_string(), "{c: null, a: [baz]}");
1322 }
1323
1324 #[test]
1325 fn test_not_coercing_primitive_into_string_without_flag() {
1326 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)]));
1327
1328 let buf = r#"{"a": 1}"#;
1329 let err = ReaderBuilder::new(schema.clone())
1330 .with_batch_size(1024)
1331 .build(Cursor::new(buf.as_bytes()))
1332 .unwrap()
1333 .read()
1334 .unwrap_err();
1335
1336 assert_eq!(
1337 err.to_string(),
1338 "Json error: whilst decoding field 'a': expected string got 1"
1339 );
1340
1341 let buf = r#"{"a": true}"#;
1342 let err = ReaderBuilder::new(schema)
1343 .with_batch_size(1024)
1344 .build(Cursor::new(buf.as_bytes()))
1345 .unwrap()
1346 .read()
1347 .unwrap_err();
1348
1349 assert_eq!(
1350 err.to_string(),
1351 "Json error: whilst decoding field 'a': expected string got true"
1352 );
1353 }
1354
1355 #[test]
1356 fn test_coercing_primitive_into_string() {
1357 let buf = r#"
1358 {"a": 1, "b": 2, "c": true}
1359 {"a": 2E0, "b": 4, "c": false}
1360
1361 {"b": 6, "a": 2.0}
1362 {"b": "5", "a": 2}
1363 {"b": 4e0}
1364 {"b": 7, "a": null}
1365 "#;
1366
1367 let schema = Arc::new(Schema::new(vec![
1368 Field::new("a", DataType::Utf8, true),
1369 Field::new("b", DataType::Utf8, true),
1370 Field::new("c", DataType::Utf8, true),
1371 ]));
1372
1373 let batches = do_read(buf, 1024, true, false, schema);
1374 assert_eq!(batches.len(), 1);
1375
1376 let col1 = batches[0].column(0).as_string::<i32>();
1377 assert_eq!(col1.null_count(), 2);
1378 assert_eq!(col1.value(0), "1");
1379 assert_eq!(col1.value(1), "2E0");
1380 assert_eq!(col1.value(2), "2.0");
1381 assert_eq!(col1.value(3), "2");
1382 assert!(col1.is_null(4));
1383 assert!(col1.is_null(5));
1384
1385 let col2 = batches[0].column(1).as_string::<i32>();
1386 assert_eq!(col2.null_count(), 0);
1387 assert_eq!(col2.value(0), "2");
1388 assert_eq!(col2.value(1), "4");
1389 assert_eq!(col2.value(2), "6");
1390 assert_eq!(col2.value(3), "5");
1391 assert_eq!(col2.value(4), "4e0");
1392 assert_eq!(col2.value(5), "7");
1393
1394 let col3 = batches[0].column(2).as_string::<i32>();
1395 assert_eq!(col3.null_count(), 4);
1396 assert_eq!(col3.value(0), "true");
1397 assert_eq!(col3.value(1), "false");
1398 assert!(col3.is_null(2));
1399 assert!(col3.is_null(3));
1400 assert!(col3.is_null(4));
1401 assert!(col3.is_null(5));
1402 }
1403
1404 fn test_decimal<T: DecimalType>(data_type: DataType) {
1405 let buf = r#"
1406 {"a": 1, "b": 2, "c": 38.30}
1407 {"a": 2, "b": 4, "c": 123.456}
1408
1409 {"b": 1337, "a": "2.0452"}
1410 {"b": "5", "a": "11034.2"}
1411 {"b": 40}
1412 {"b": 1234, "a": null}
1413 "#;
1414
1415 let schema = Arc::new(Schema::new(vec![
1416 Field::new("a", data_type.clone(), true),
1417 Field::new("b", data_type.clone(), true),
1418 Field::new("c", data_type, true),
1419 ]));
1420
1421 let batches = do_read(buf, 1024, true, false, schema);
1422 assert_eq!(batches.len(), 1);
1423
1424 let col1 = batches[0].column(0).as_primitive::<T>();
1425 assert_eq!(col1.null_count(), 2);
1426 assert!(col1.is_null(4));
1427 assert!(col1.is_null(5));
1428 assert_eq!(
1429 col1.values(),
1430 &[100, 200, 204, 1103420, 0, 0].map(T::Native::usize_as)
1431 );
1432
1433 let col2 = batches[0].column(1).as_primitive::<T>();
1434 assert_eq!(col2.null_count(), 0);
1435 assert_eq!(
1436 col2.values(),
1437 &[200, 400, 133700, 500, 4000, 123400].map(T::Native::usize_as)
1438 );
1439
1440 let col3 = batches[0].column(2).as_primitive::<T>();
1441 assert_eq!(col3.null_count(), 4);
1442 assert!(!col3.is_null(0));
1443 assert!(!col3.is_null(1));
1444 assert!(col3.is_null(2));
1445 assert!(col3.is_null(3));
1446 assert!(col3.is_null(4));
1447 assert!(col3.is_null(5));
1448 assert_eq!(
1449 col3.values(),
1450 &[3830, 12345, 0, 0, 0, 0].map(T::Native::usize_as)
1451 );
1452 }
1453
1454 #[test]
1455 fn test_decimals() {
1456 test_decimal::<Decimal32Type>(DataType::Decimal32(8, 2));
1457 test_decimal::<Decimal64Type>(DataType::Decimal64(10, 2));
1458 test_decimal::<Decimal128Type>(DataType::Decimal128(10, 2));
1459 test_decimal::<Decimal256Type>(DataType::Decimal256(10, 2));
1460 }
1461
1462 fn test_timestamp<T: ArrowTimestampType>() {
1463 let buf = r#"
1464 {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"}
1465 {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456}
1466
1467 {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"}
1468 {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"}
1469 {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"}
1470 {"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"}
1471 "#;
1472
1473 let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".into()));
1474 let schema = Arc::new(Schema::new(vec![
1475 Field::new("a", T::DATA_TYPE, true),
1476 Field::new("b", T::DATA_TYPE, true),
1477 Field::new("c", T::DATA_TYPE, true),
1478 Field::new("d", with_timezone, true),
1479 ]));
1480
1481 let batches = do_read(buf, 1024, true, false, schema);
1482 assert_eq!(batches.len(), 1);
1483
1484 let unit_in_nanos: i64 = match T::UNIT {
1485 TimeUnit::Second => 1_000_000_000,
1486 TimeUnit::Millisecond => 1_000_000,
1487 TimeUnit::Microsecond => 1_000,
1488 TimeUnit::Nanosecond => 1,
1489 };
1490
1491 let col1 = batches[0].column(0).as_primitive::<T>();
1492 assert_eq!(col1.null_count(), 4);
1493 assert!(col1.is_null(2));
1494 assert!(col1.is_null(3));
1495 assert!(col1.is_null(4));
1496 assert!(col1.is_null(5));
1497 assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1498
1499 let col2 = batches[0].column(1).as_primitive::<T>();
1500 assert_eq!(col2.null_count(), 1);
1501 assert!(col2.is_null(5));
1502 assert_eq!(
1503 col2.values(),
1504 &[
1505 1599572549190855000 / unit_in_nanos,
1506 1599572549190855000 / unit_in_nanos,
1507 1599572549000000000 / unit_in_nanos,
1508 40,
1509 1234,
1510 0
1511 ]
1512 );
1513
1514 let col3 = batches[0].column(2).as_primitive::<T>();
1515 assert_eq!(col3.null_count(), 0);
1516 assert_eq!(
1517 col3.values(),
1518 &[
1519 38,
1520 123,
1521 854702816123000000 / unit_in_nanos,
1522 1599572549190855000 / unit_in_nanos,
1523 854702816123000000 / unit_in_nanos,
1524 854738816123000000 / unit_in_nanos
1525 ]
1526 );
1527
1528 let col4 = batches[0].column(3).as_primitive::<T>();
1529
1530 assert_eq!(col4.null_count(), 0);
1531 assert_eq!(
1532 col4.values(),
1533 &[
1534 854674016123000000 / unit_in_nanos,
1535 123,
1536 854702816123000000 / unit_in_nanos,
1537 854720816123000000 / unit_in_nanos,
1538 854674016000000000 / unit_in_nanos,
1539 854640000000000000 / unit_in_nanos
1540 ]
1541 );
1542 }
1543
1544 #[test]
1545 fn test_timestamps() {
1546 test_timestamp::<TimestampSecondType>();
1547 test_timestamp::<TimestampMillisecondType>();
1548 test_timestamp::<TimestampMicrosecondType>();
1549 test_timestamp::<TimestampNanosecondType>();
1550 }
1551
1552 fn test_time<T: ArrowTemporalType>() {
1553 let buf = r#"
1554 {"a": 1, "b": "09:26:56.123 AM", "c": 38.30}
1555 {"a": 2, "b": "23:59:59", "c": 123.456}
1556
1557 {"b": 1337, "b": "6:00 pm", "c": "09:26:56.123"}
1558 {"b": 40, "c": "13:42:29.190855"}
1559 {"b": 1234, "a": null, "c": "09:26:56.123"}
1560 {"c": "14:26:56.123"}
1561 "#;
1562
1563 let unit = match T::DATA_TYPE {
1564 DataType::Time32(unit) | DataType::Time64(unit) => unit,
1565 _ => unreachable!(),
1566 };
1567
1568 let unit_in_nanos = match unit {
1569 TimeUnit::Second => 1_000_000_000,
1570 TimeUnit::Millisecond => 1_000_000,
1571 TimeUnit::Microsecond => 1_000,
1572 TimeUnit::Nanosecond => 1,
1573 };
1574
1575 let schema = Arc::new(Schema::new(vec![
1576 Field::new("a", T::DATA_TYPE, true),
1577 Field::new("b", T::DATA_TYPE, true),
1578 Field::new("c", T::DATA_TYPE, true),
1579 ]));
1580
1581 let batches = do_read(buf, 1024, true, false, schema);
1582 assert_eq!(batches.len(), 1);
1583
1584 let col1 = batches[0].column(0).as_primitive::<T>();
1585 assert_eq!(col1.null_count(), 4);
1586 assert!(col1.is_null(2));
1587 assert!(col1.is_null(3));
1588 assert!(col1.is_null(4));
1589 assert!(col1.is_null(5));
1590 assert_eq!(col1.values(), &[1, 2, 0, 0, 0, 0].map(T::Native::usize_as));
1591
1592 let col2 = batches[0].column(1).as_primitive::<T>();
1593 assert_eq!(col2.null_count(), 1);
1594 assert!(col2.is_null(5));
1595 assert_eq!(
1596 col2.values(),
1597 &[
1598 34016123000000 / unit_in_nanos,
1599 86399000000000 / unit_in_nanos,
1600 64800000000000 / unit_in_nanos,
1601 40,
1602 1234,
1603 0
1604 ]
1605 .map(T::Native::usize_as)
1606 );
1607
1608 let col3 = batches[0].column(2).as_primitive::<T>();
1609 assert_eq!(col3.null_count(), 0);
1610 assert_eq!(
1611 col3.values(),
1612 &[
1613 38,
1614 123,
1615 34016123000000 / unit_in_nanos,
1616 49349190855000 / unit_in_nanos,
1617 34016123000000 / unit_in_nanos,
1618 52016123000000 / unit_in_nanos
1619 ]
1620 .map(T::Native::usize_as)
1621 );
1622 }
1623
1624 #[test]
1625 fn test_times() {
1626 test_time::<Time32MillisecondType>();
1627 test_time::<Time32SecondType>();
1628 test_time::<Time64MicrosecondType>();
1629 test_time::<Time64NanosecondType>();
1630 }
1631
1632 fn test_duration<T: ArrowTemporalType>() {
1633 let buf = r#"
1634 {"a": 1, "b": "2"}
1635 {"a": 3, "b": null}
1636 "#;
1637
1638 let schema = Arc::new(Schema::new(vec![
1639 Field::new("a", T::DATA_TYPE, true),
1640 Field::new("b", T::DATA_TYPE, true),
1641 ]));
1642
1643 let batches = do_read(buf, 1024, true, false, schema);
1644 assert_eq!(batches.len(), 1);
1645
1646 let col_a = batches[0].column_by_name("a").unwrap().as_primitive::<T>();
1647 assert_eq!(col_a.null_count(), 0);
1648 assert_eq!(col_a.values(), &[1, 3].map(T::Native::usize_as));
1649
1650 let col2 = batches[0].column_by_name("b").unwrap().as_primitive::<T>();
1651 assert_eq!(col2.null_count(), 1);
1652 assert_eq!(col2.values(), &[2, 0].map(T::Native::usize_as));
1653 }
1654
1655 #[test]
1656 fn test_durations() {
1657 test_duration::<DurationNanosecondType>();
1658 test_duration::<DurationMicrosecondType>();
1659 test_duration::<DurationMillisecondType>();
1660 test_duration::<DurationSecondType>();
1661 }
1662
1663 #[test]
1664 fn test_delta_checkpoint() {
1665 let json = "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}";
1666 let schema = Arc::new(Schema::new(vec![
1667 Field::new_struct(
1668 "protocol",
1669 vec![
1670 Field::new("minReaderVersion", DataType::Int32, true),
1671 Field::new("minWriterVersion", DataType::Int32, true),
1672 ],
1673 true,
1674 ),
1675 Field::new_struct(
1676 "add",
1677 vec![Field::new_map(
1678 "partitionValues",
1679 "key_value",
1680 Field::new("key", DataType::Utf8, false),
1681 Field::new("value", DataType::Utf8, true),
1682 false,
1683 false,
1684 )],
1685 true,
1686 ),
1687 ]));
1688
1689 let batches = do_read(json, 1024, true, false, schema);
1690 assert_eq!(batches.len(), 1);
1691
1692 let s: StructArray = batches.into_iter().next().unwrap().into();
1693 let opts = FormatOptions::default().with_null("null");
1694 let formatter = ArrayFormatter::try_new(&s, &opts).unwrap();
1695 assert_eq!(
1696 formatter.value(0).to_string(),
1697 "{protocol: {minReaderVersion: 1, minWriterVersion: 2}, add: null}"
1698 );
1699 }
1700
1701 #[test]
1702 fn struct_nullability() {
1703 let do_test = |child: DataType| {
1704 let non_null = r#"{"foo": {}}"#;
1706 let schema = Arc::new(Schema::new(vec![Field::new_struct(
1707 "foo",
1708 vec![Field::new("bar", child, false)],
1709 true,
1710 )]));
1711 let mut reader = ReaderBuilder::new(schema.clone())
1712 .build(Cursor::new(non_null.as_bytes()))
1713 .unwrap();
1714 assert!(reader.next().unwrap().is_err()); let null = r#"{"foo": {bar: null}}"#;
1717 let mut reader = ReaderBuilder::new(schema.clone())
1718 .build(Cursor::new(null.as_bytes()))
1719 .unwrap();
1720 assert!(reader.next().unwrap().is_err()); let null = r#"{"foo": null}"#;
1724 let mut reader = ReaderBuilder::new(schema)
1725 .build(Cursor::new(null.as_bytes()))
1726 .unwrap();
1727 let batch = reader.next().unwrap().unwrap();
1728 assert_eq!(batch.num_columns(), 1);
1729 let foo = batch.column(0).as_struct();
1730 assert_eq!(foo.len(), 1);
1731 assert!(foo.is_null(0));
1732 assert_eq!(foo.num_columns(), 1);
1733
1734 let bar = foo.column(0);
1735 assert_eq!(bar.len(), 1);
1736 assert!(bar.is_null(0));
1738 };
1739
1740 do_test(DataType::Boolean);
1741 do_test(DataType::Int32);
1742 do_test(DataType::Utf8);
1743 do_test(DataType::Decimal128(2, 1));
1744 do_test(DataType::Timestamp(
1745 TimeUnit::Microsecond,
1746 Some("+00:00".into()),
1747 ));
1748 }
1749
1750 #[test]
1751 fn test_truncation() {
1752 let buf = r#"
1753 {"i64": 9223372036854775807, "u64": 18446744073709551615 }
1754 {"i64": "9223372036854775807", "u64": "18446744073709551615" }
1755 {"i64": -9223372036854775808, "u64": 0 }
1756 {"i64": "-9223372036854775808", "u64": 0 }
1757 "#;
1758
1759 let schema = Arc::new(Schema::new(vec![
1760 Field::new("i64", DataType::Int64, true),
1761 Field::new("u64", DataType::UInt64, true),
1762 ]));
1763
1764 let batches = do_read(buf, 1024, true, false, schema);
1765 assert_eq!(batches.len(), 1);
1766
1767 let i64 = batches[0].column(0).as_primitive::<Int64Type>();
1768 assert_eq!(i64.values(), &[i64::MAX, i64::MAX, i64::MIN, i64::MIN]);
1769
1770 let u64 = batches[0].column(1).as_primitive::<UInt64Type>();
1771 assert_eq!(u64.values(), &[u64::MAX, u64::MAX, u64::MIN, u64::MIN]);
1772 }
1773
1774 #[test]
1775 fn test_timestamp_truncation() {
1776 let buf = r#"
1777 {"time": 9223372036854775807 }
1778 {"time": -9223372036854775808 }
1779 {"time": 9e5 }
1780 "#;
1781
1782 let schema = Arc::new(Schema::new(vec![Field::new(
1783 "time",
1784 DataType::Timestamp(TimeUnit::Nanosecond, None),
1785 true,
1786 )]));
1787
1788 let batches = do_read(buf, 1024, true, false, schema);
1789 assert_eq!(batches.len(), 1);
1790
1791 let i64 = batches[0]
1792 .column(0)
1793 .as_primitive::<TimestampNanosecondType>();
1794 assert_eq!(i64.values(), &[i64::MAX, i64::MIN, 900000]);
1795 }
1796
1797 #[test]
1798 fn test_strict_mode_no_missing_columns_in_schema() {
1799 let buf = r#"
1800 {"a": 1, "b": "2", "c": true}
1801 {"a": 2E0, "b": "4", "c": false}
1802 "#;
1803
1804 let schema = Arc::new(Schema::new(vec![
1805 Field::new("a", DataType::Int16, false),
1806 Field::new("b", DataType::Utf8, false),
1807 Field::new("c", DataType::Boolean, false),
1808 ]));
1809
1810 let batches = do_read(buf, 1024, true, true, schema);
1811 assert_eq!(batches.len(), 1);
1812
1813 let buf = r#"
1814 {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1815 {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1816 "#;
1817
1818 let schema = Arc::new(Schema::new(vec![
1819 Field::new("a", DataType::Int16, false),
1820 Field::new("b", DataType::Utf8, false),
1821 Field::new_struct(
1822 "c",
1823 vec![
1824 Field::new("a", DataType::Boolean, false),
1825 Field::new("b", DataType::Int16, false),
1826 ],
1827 false,
1828 ),
1829 ]));
1830
1831 let batches = do_read(buf, 1024, true, true, schema);
1832 assert_eq!(batches.len(), 1);
1833 }
1834
1835 #[test]
1836 fn test_strict_mode_missing_columns_in_schema() {
1837 let buf = r#"
1838 {"a": 1, "b": "2", "c": true}
1839 {"a": 2E0, "b": "4", "c": false}
1840 "#;
1841
1842 let schema = Arc::new(Schema::new(vec![
1843 Field::new("a", DataType::Int16, true),
1844 Field::new("c", DataType::Boolean, true),
1845 ]));
1846
1847 let err = ReaderBuilder::new(schema)
1848 .with_batch_size(1024)
1849 .with_strict_mode(true)
1850 .build(Cursor::new(buf.as_bytes()))
1851 .unwrap()
1852 .read()
1853 .unwrap_err();
1854
1855 assert_eq!(
1856 err.to_string(),
1857 "Json error: column 'b' missing from schema"
1858 );
1859
1860 let buf = r#"
1861 {"a": 1, "b": "2", "c": {"a": true, "b": 1}}
1862 {"a": 2E0, "b": "4", "c": {"a": false, "b": 2}}
1863 "#;
1864
1865 let schema = Arc::new(Schema::new(vec![
1866 Field::new("a", DataType::Int16, false),
1867 Field::new("b", DataType::Utf8, false),
1868 Field::new_struct("c", vec![Field::new("a", DataType::Boolean, false)], false),
1869 ]));
1870
1871 let err = ReaderBuilder::new(schema)
1872 .with_batch_size(1024)
1873 .with_strict_mode(true)
1874 .build(Cursor::new(buf.as_bytes()))
1875 .unwrap()
1876 .read()
1877 .unwrap_err();
1878
1879 assert_eq!(
1880 err.to_string(),
1881 "Json error: whilst decoding field 'c': column 'b' missing from schema"
1882 );
1883 }
1884
1885 fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
1886 let file = File::open(path).unwrap();
1887 let mut reader = BufReader::new(file);
1888 let schema = schema.unwrap_or_else(|| {
1889 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
1890 reader.rewind().unwrap();
1891 schema
1892 });
1893 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
1894 builder.build(reader).unwrap()
1895 }
1896
1897 #[test]
1898 fn test_json_basic() {
1899 let mut reader = read_file("test/data/basic.json", None);
1900 let batch = reader.next().unwrap().unwrap();
1901
1902 assert_eq!(8, batch.num_columns());
1903 assert_eq!(12, batch.num_rows());
1904
1905 let schema = reader.schema();
1906 let batch_schema = batch.schema();
1907 assert_eq!(schema, batch_schema);
1908
1909 let a = schema.column_with_name("a").unwrap();
1910 assert_eq!(0, a.0);
1911 assert_eq!(&DataType::Int64, a.1.data_type());
1912 let b = schema.column_with_name("b").unwrap();
1913 assert_eq!(1, b.0);
1914 assert_eq!(&DataType::Float64, b.1.data_type());
1915 let c = schema.column_with_name("c").unwrap();
1916 assert_eq!(2, c.0);
1917 assert_eq!(&DataType::Boolean, c.1.data_type());
1918 let d = schema.column_with_name("d").unwrap();
1919 assert_eq!(3, d.0);
1920 assert_eq!(&DataType::Utf8, d.1.data_type());
1921
1922 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1923 assert_eq!(1, aa.value(0));
1924 assert_eq!(-10, aa.value(1));
1925 let bb = batch.column(b.0).as_primitive::<Float64Type>();
1926 assert_eq!(2.0, bb.value(0));
1927 assert_eq!(-3.5, bb.value(1));
1928 let cc = batch.column(c.0).as_boolean();
1929 assert!(!cc.value(0));
1930 assert!(cc.value(10));
1931 let dd = batch.column(d.0).as_string::<i32>();
1932 assert_eq!("4", dd.value(0));
1933 assert_eq!("text", dd.value(8));
1934 }
1935
1936 #[test]
1937 fn test_json_empty_projection() {
1938 let mut reader = read_file("test/data/basic.json", Some(Schema::empty()));
1939 let batch = reader.next().unwrap().unwrap();
1940
1941 assert_eq!(0, batch.num_columns());
1942 assert_eq!(12, batch.num_rows());
1943 }
1944
1945 #[test]
1946 fn test_json_basic_with_nulls() {
1947 let mut reader = read_file("test/data/basic_nulls.json", None);
1948 let batch = reader.next().unwrap().unwrap();
1949
1950 assert_eq!(4, batch.num_columns());
1951 assert_eq!(12, batch.num_rows());
1952
1953 let schema = reader.schema();
1954 let batch_schema = batch.schema();
1955 assert_eq!(schema, batch_schema);
1956
1957 let a = schema.column_with_name("a").unwrap();
1958 assert_eq!(&DataType::Int64, a.1.data_type());
1959 let b = schema.column_with_name("b").unwrap();
1960 assert_eq!(&DataType::Float64, b.1.data_type());
1961 let c = schema.column_with_name("c").unwrap();
1962 assert_eq!(&DataType::Boolean, c.1.data_type());
1963 let d = schema.column_with_name("d").unwrap();
1964 assert_eq!(&DataType::Utf8, d.1.data_type());
1965
1966 let aa = batch.column(a.0).as_primitive::<Int64Type>();
1967 assert!(aa.is_valid(0));
1968 assert!(!aa.is_valid(1));
1969 assert!(!aa.is_valid(11));
1970 let bb = batch.column(b.0).as_primitive::<Float64Type>();
1971 assert!(bb.is_valid(0));
1972 assert!(!bb.is_valid(2));
1973 assert!(!bb.is_valid(11));
1974 let cc = batch.column(c.0).as_boolean();
1975 assert!(cc.is_valid(0));
1976 assert!(!cc.is_valid(4));
1977 assert!(!cc.is_valid(11));
1978 let dd = batch.column(d.0).as_string::<i32>();
1979 assert!(!dd.is_valid(0));
1980 assert!(dd.is_valid(1));
1981 assert!(!dd.is_valid(4));
1982 assert!(!dd.is_valid(11));
1983 }
1984
1985 #[test]
1986 fn test_json_basic_schema() {
1987 let schema = Schema::new(vec![
1988 Field::new("a", DataType::Int64, true),
1989 Field::new("b", DataType::Float32, false),
1990 Field::new("c", DataType::Boolean, false),
1991 Field::new("d", DataType::Utf8, false),
1992 ]);
1993
1994 let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
1995 let reader_schema = reader.schema();
1996 assert_eq!(reader_schema.as_ref(), &schema);
1997 let batch = reader.next().unwrap().unwrap();
1998
1999 assert_eq!(4, batch.num_columns());
2000 assert_eq!(12, batch.num_rows());
2001
2002 let schema = batch.schema();
2003
2004 let a = schema.column_with_name("a").unwrap();
2005 assert_eq!(&DataType::Int64, a.1.data_type());
2006 let b = schema.column_with_name("b").unwrap();
2007 assert_eq!(&DataType::Float32, b.1.data_type());
2008 let c = schema.column_with_name("c").unwrap();
2009 assert_eq!(&DataType::Boolean, c.1.data_type());
2010 let d = schema.column_with_name("d").unwrap();
2011 assert_eq!(&DataType::Utf8, d.1.data_type());
2012
2013 let aa = batch.column(a.0).as_primitive::<Int64Type>();
2014 assert_eq!(1, aa.value(0));
2015 assert_eq!(100000000000000, aa.value(11));
2016 let bb = batch.column(b.0).as_primitive::<Float32Type>();
2017 assert_eq!(2.0, bb.value(0));
2018 assert_eq!(-3.5, bb.value(1));
2019 }
2020
2021 #[test]
2022 fn test_json_basic_schema_projection() {
2023 let schema = Schema::new(vec![
2024 Field::new("a", DataType::Int64, true),
2025 Field::new("c", DataType::Boolean, false),
2026 ]);
2027
2028 let mut reader = read_file("test/data/basic.json", Some(schema.clone()));
2029 let batch = reader.next().unwrap().unwrap();
2030
2031 assert_eq!(2, batch.num_columns());
2032 assert_eq!(2, batch.schema().fields().len());
2033 assert_eq!(12, batch.num_rows());
2034
2035 assert_eq!(batch.schema().as_ref(), &schema);
2036
2037 let a = schema.column_with_name("a").unwrap();
2038 assert_eq!(0, a.0);
2039 assert_eq!(&DataType::Int64, a.1.data_type());
2040 let c = schema.column_with_name("c").unwrap();
2041 assert_eq!(1, c.0);
2042 assert_eq!(&DataType::Boolean, c.1.data_type());
2043 }
2044
2045 #[test]
2046 fn test_json_arrays() {
2047 let mut reader = read_file("test/data/arrays.json", None);
2048 let batch = reader.next().unwrap().unwrap();
2049
2050 assert_eq!(4, batch.num_columns());
2051 assert_eq!(3, batch.num_rows());
2052
2053 let schema = batch.schema();
2054
2055 let a = schema.column_with_name("a").unwrap();
2056 assert_eq!(&DataType::Int64, a.1.data_type());
2057 let b = schema.column_with_name("b").unwrap();
2058 assert_eq!(
2059 &DataType::List(Arc::new(Field::new_list_field(DataType::Float64, true))),
2060 b.1.data_type()
2061 );
2062 let c = schema.column_with_name("c").unwrap();
2063 assert_eq!(
2064 &DataType::List(Arc::new(Field::new_list_field(DataType::Boolean, true))),
2065 c.1.data_type()
2066 );
2067 let d = schema.column_with_name("d").unwrap();
2068 assert_eq!(&DataType::Utf8, d.1.data_type());
2069
2070 let aa = batch.column(a.0).as_primitive::<Int64Type>();
2071 assert_eq!(1, aa.value(0));
2072 assert_eq!(-10, aa.value(1));
2073 assert_eq!(1627668684594000000, aa.value(2));
2074 let bb = batch.column(b.0).as_list::<i32>();
2075 let bb = bb.values().as_primitive::<Float64Type>();
2076 assert_eq!(9, bb.len());
2077 assert_eq!(2.0, bb.value(0));
2078 assert_eq!(-6.1, bb.value(5));
2079 assert!(!bb.is_valid(7));
2080
2081 let cc = batch
2082 .column(c.0)
2083 .as_any()
2084 .downcast_ref::<ListArray>()
2085 .unwrap();
2086 let cc = cc.values().as_boolean();
2087 assert_eq!(6, cc.len());
2088 assert!(!cc.value(0));
2089 assert!(!cc.value(4));
2090 assert!(!cc.is_valid(5));
2091 }
2092
2093 #[test]
2094 fn test_empty_json_arrays() {
2095 let json_content = r#"
2096 {"items": []}
2097 {"items": null}
2098 {}
2099 "#;
2100
2101 let schema = Arc::new(Schema::new(vec![Field::new(
2102 "items",
2103 DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2104 true,
2105 )]));
2106
2107 let batches = do_read(json_content, 1024, false, false, schema);
2108 assert_eq!(batches.len(), 1);
2109
2110 let col1 = batches[0].column(0).as_list::<i32>();
2111 assert_eq!(col1.null_count(), 2);
2112 assert!(col1.value(0).is_empty());
2113 assert_eq!(col1.value(0).data_type(), &DataType::Null);
2114 assert!(col1.is_null(1));
2115 assert!(col1.is_null(2));
2116 }
2117
2118 #[test]
2119 fn test_nested_empty_json_arrays() {
2120 let json_content = r#"
2121 {"items": [[],[]]}
2122 {"items": [[null, null],[null]]}
2123 "#;
2124
2125 let schema = Arc::new(Schema::new(vec![Field::new(
2126 "items",
2127 DataType::List(FieldRef::new(Field::new_list_field(
2128 DataType::List(FieldRef::new(Field::new_list_field(DataType::Null, true))),
2129 true,
2130 ))),
2131 true,
2132 )]));
2133
2134 let batches = do_read(json_content, 1024, false, false, schema);
2135 assert_eq!(batches.len(), 1);
2136
2137 let col1 = batches[0].column(0).as_list::<i32>();
2138 assert_eq!(col1.null_count(), 0);
2139 assert_eq!(col1.value(0).len(), 2);
2140 assert!(col1.value(0).as_list::<i32>().value(0).is_empty());
2141 assert!(col1.value(0).as_list::<i32>().value(1).is_empty());
2142
2143 assert_eq!(col1.value(1).len(), 2);
2144 assert_eq!(col1.value(1).as_list::<i32>().value(0).len(), 2);
2145 assert_eq!(col1.value(1).as_list::<i32>().value(1).len(), 1);
2146 }
2147
2148 #[test]
2149 fn test_nested_list_json_arrays() {
2150 let c_field = Field::new_struct("c", vec![Field::new("d", DataType::Utf8, true)], true);
2151 let a_struct_field = Field::new_struct(
2152 "a",
2153 vec![Field::new("b", DataType::Boolean, true), c_field.clone()],
2154 true,
2155 );
2156 let a_field = Field::new("a", DataType::List(Arc::new(a_struct_field.clone())), true);
2157 let schema = Arc::new(Schema::new(vec![a_field.clone()]));
2158 let builder = ReaderBuilder::new(schema).with_batch_size(64);
2159 let json_content = r#"
2160 {"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
2161 {"a": [{"b": false, "c": null}]}
2162 {"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
2163 {"a": null}
2164 {"a": []}
2165 {"a": [null]}
2166 "#;
2167 let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2168
2169 let d = StringArray::from(vec![
2171 Some("a_text"),
2172 Some("b_text"),
2173 None,
2174 Some("c_text"),
2175 Some("d_text"),
2176 None,
2177 None,
2178 ]);
2179 let c = StructArray::new(
2180 vec![Field::new("d", DataType::Utf8, true)].into(),
2181 vec![Arc::new(d.clone()) as ArrayRef],
2182 Some(NullBuffer::from(vec![
2183 true, true, false, true, true, true, false,
2184 ])),
2185 );
2186 let b = BooleanArray::from(vec![
2187 Some(true),
2188 Some(false),
2189 Some(false),
2190 Some(true),
2191 None,
2192 Some(true),
2193 None,
2194 ]);
2195 let a = StructArray::new(
2196 vec![Field::new("b", DataType::Boolean, true), c_field.clone()].into(),
2197 vec![
2198 Arc::new(b.clone()) as ArrayRef,
2199 Arc::new(c.clone()) as ArrayRef,
2200 ],
2201 Some(NullBuffer::from(vec![
2202 true, true, true, true, true, true, false,
2203 ])),
2204 );
2205 let a_list = ListArray::new(
2206 Arc::new(a_struct_field.clone()),
2207 OffsetBuffer::new(ScalarBuffer::from(vec![0i32, 2, 3, 6, 6, 6, 7])),
2208 Arc::new(a),
2209 Some(NullBuffer::from(vec![true, true, true, false, true, true])),
2210 );
2211
2212 let batch = reader.next().unwrap().unwrap();
2214 let read = batch.column(0);
2215 assert_eq!(read.len(), 6);
2216 let read: &ListArray = read.as_list::<i32>();
2218 let expected = &a_list;
2219 assert_eq!(read.value_offsets(), &[0, 2, 3, 6, 6, 6, 7]);
2220 assert_eq!(read.nulls(), expected.nulls());
2222 let struct_array = read.values().as_struct();
2224 let expected_struct_array = expected.values().as_struct();
2225
2226 assert_eq!(7, struct_array.len());
2227 assert_eq!(1, struct_array.null_count());
2228 assert_eq!(7, expected_struct_array.len());
2229 assert_eq!(1, expected_struct_array.null_count());
2230 assert_eq!(struct_array.nulls(), expected_struct_array.nulls());
2232 let read_b = struct_array.column(0);
2234 assert_eq!(read_b.as_ref(), &b);
2235 let read_c = struct_array.column(1);
2236 assert_eq!(read_c.as_struct(), &c);
2237 let read_c = read_c.as_struct();
2238 let read_d = read_c.column(0);
2239 assert_eq!(read_d.as_ref(), &d);
2240
2241 assert_eq!(read, expected);
2242 }
2243
2244 fn assert_read_list_view<O: OffsetSizeTrait>() {
2245 let field = Arc::new(Field::new("item", DataType::Int32, true));
2246 let data_type = GenericListViewArray::<O>::DATA_TYPE_CONSTRUCTOR(field.clone());
2247 let schema = Arc::new(Schema::new(vec![Field::new("lv", data_type, true)]));
2248
2249 let buf = r#"
2250 {"lv": [1, 2, 3]}
2251 {"lv": [4, null]}
2252 {"lv": null}
2253 {"lv": [6]}
2254 {"lv": []}
2255 "#;
2256
2257 let batches = do_read(buf, 1024, false, false, schema);
2258 assert_eq!(batches.len(), 1);
2259 let batch = &batches[0];
2260 let col = batch.column(0);
2261 let list_view = col
2262 .as_any()
2263 .downcast_ref::<GenericListViewArray<O>>()
2264 .unwrap();
2265
2266 assert_eq!(list_view.len(), 5);
2267
2268 let expected_offsets: Vec<O> = vec![0, 3, 5, 5, 6]
2270 .into_iter()
2271 .map(|v| O::usize_as(v))
2272 .collect();
2273 let expected_sizes: Vec<O> = vec![3, 2, 0, 1, 0]
2274 .into_iter()
2275 .map(|v| O::usize_as(v))
2276 .collect();
2277 assert_eq!(list_view.value_offsets(), &expected_offsets);
2278 assert_eq!(list_view.value_sizes(), &expected_sizes);
2279
2280 assert!(list_view.is_valid(0));
2282 let vals = list_view.value(0);
2283 let ints = vals.as_primitive::<Int32Type>();
2284 assert_eq!(ints.values(), &[1, 2, 3]);
2285
2286 assert!(list_view.is_valid(1));
2288 let vals = list_view.value(1);
2289 let ints = vals.as_primitive::<Int32Type>();
2290 assert_eq!(ints.len(), 2);
2291 assert_eq!(ints.value(0), 4);
2292 assert!(ints.is_null(1));
2293
2294 assert!(list_view.is_null(2));
2296
2297 assert!(list_view.is_valid(3));
2299 let vals = list_view.value(3);
2300 let ints = vals.as_primitive::<Int32Type>();
2301 assert_eq!(ints.values(), &[6]);
2302
2303 assert!(list_view.is_valid(4));
2305 let vals = list_view.value(4);
2306 assert_eq!(vals.len(), 0);
2307 }
2308
2309 #[test]
2310 fn test_read_list_view() {
2311 assert_read_list_view::<i32>();
2312 assert_read_list_view::<i64>();
2313 }
2314
2315 #[test]
2316 fn test_fixed_size_list() {
2317 let buf = r#"
2318 {"a": [1, 2, 3]}
2319 {"a": [4, 5, 6]}
2320 {"a": [7, 8, 9]}
2321 "#;
2322
2323 let field = Field::new_list_field(DataType::Int32, true);
2324 let schema = Arc::new(Schema::new(vec![Field::new(
2325 "a",
2326 DataType::FixedSizeList(Arc::new(field), 3),
2327 false,
2328 )]));
2329
2330 let batches = do_read(buf, 1024, false, false, schema);
2331 assert_eq!(batches.len(), 1);
2332
2333 let col = batches[0].column(0).as_fixed_size_list();
2334 assert_eq!(col.len(), 3);
2335 assert_eq!(col.value_length(), 3);
2336
2337 let values = col.values().as_primitive::<Int32Type>();
2338 assert_eq!(values.values(), &[1, 2, 3, 4, 5, 6, 7, 8, 9]);
2339 }
2340
2341 #[test]
2342 fn test_fixed_size_list_nullable() {
2343 let buf = r#"
2344 {"a": [1, 2]}
2345 {"a": null}
2346 {"a": [3, null]}
2347 "#;
2348
2349 let field = Field::new_list_field(DataType::Int32, true);
2350 let schema = Arc::new(Schema::new(vec![Field::new(
2351 "a",
2352 DataType::FixedSizeList(Arc::new(field), 2),
2353 true,
2354 )]));
2355
2356 let batches = do_read(buf, 1024, false, false, schema);
2357 assert_eq!(batches.len(), 1);
2358
2359 let col = batches[0].column(0).as_fixed_size_list();
2360 assert_eq!(col.len(), 3);
2361 assert!(col.is_valid(0));
2362 assert!(col.is_null(1));
2363 assert!(col.is_valid(2));
2364
2365 let values = col.values().as_primitive::<Int32Type>();
2366 assert_eq!(values.value(0), 1);
2367 assert_eq!(values.value(1), 2);
2368 assert_eq!(values.value(4), 3);
2369 assert!(values.is_null(5));
2370 }
2371
2372 #[test]
2373 fn test_fixed_size_list_zero_size_non_nullable() {
2374 let buf = r#"
2375 {"a": []}
2376 {"a": []}
2377 {"a": []}
2378 "#;
2379
2380 let field = Field::new_list_field(DataType::Int32, true);
2381 let schema = Arc::new(Schema::new(vec![Field::new(
2382 "a",
2383 DataType::FixedSizeList(Arc::new(field), 0),
2384 false,
2385 )]));
2386
2387 let batches = do_read(buf, 1024, false, false, schema);
2388 assert_eq!(batches.len(), 1);
2389
2390 let col = batches[0].column(0).as_fixed_size_list();
2391 assert_eq!(col.len(), 3);
2392 assert_eq!(col.value_length(), 0);
2393
2394 let values = col.values().as_primitive::<Int32Type>();
2395 assert!(values.values().is_empty());
2396 }
2397
2398 #[test]
2399 fn test_fixed_size_list_wrong_size() {
2400 let buf = r#"{"a": [1, 2, 3]}"#;
2401
2402 let field = Field::new_list_field(DataType::Int32, true);
2403 let schema = Arc::new(Schema::new(vec![Field::new(
2404 "a",
2405 DataType::FixedSizeList(Arc::new(field), 2),
2406 false,
2407 )]));
2408
2409 let err = ReaderBuilder::new(schema)
2410 .build(Cursor::new(buf.as_bytes()))
2411 .unwrap()
2412 .next()
2413 .unwrap()
2414 .unwrap_err();
2415
2416 assert!(err.to_string().contains("expected 2 but got 3"), "{}", err);
2417 }
2418
2419 #[test]
2420 fn test_fixed_size_list_nested() {
2421 let buf = r#"
2422 {"a": [[1, 2], [3, 4]]}
2423 {"a": [[5, 6], [7, 8]]}
2424 "#;
2425
2426 let inner_field = Field::new_list_field(DataType::Int32, true);
2427 let inner_type = DataType::FixedSizeList(Arc::new(inner_field), 2);
2428 let outer_field = Arc::new(Field::new_list_field(inner_type.clone(), true));
2429 let schema = Arc::new(Schema::new(vec![Field::new(
2430 "a",
2431 DataType::FixedSizeList(outer_field, 2),
2432 false,
2433 )]));
2434
2435 let batches = do_read(buf, 1024, false, false, schema);
2436 assert_eq!(batches.len(), 1);
2437
2438 let col = batches[0].column(0).as_fixed_size_list();
2439 assert_eq!(col.len(), 2);
2440 assert_eq!(col.value_length(), 2);
2441
2442 let inner = col.values().as_fixed_size_list();
2443 assert_eq!(inner.len(), 4);
2444 assert_eq!(inner.value_length(), 2);
2445
2446 let values = inner.values().as_primitive::<Int32Type>();
2447 assert_eq!(values.values(), &[1, 2, 3, 4, 5, 6, 7, 8]);
2448 }
2449
2450 #[test]
2451 fn test_fixed_size_list_ignore_type_conflicts() {
2452 let field = Field::new("item", DataType::Int32, true);
2453 let schema = Arc::new(Schema::new(vec![Field::new(
2454 "a",
2455 DataType::FixedSizeList(Arc::new(field), 2),
2456 true,
2457 )]));
2458
2459 let json = vec![
2460 json!({"a": [1, 2]}),
2461 json!({"a": "not a list"}),
2462 json!({"a": 42}),
2463 json!({"a": [6, 7]}),
2464 ];
2465
2466 let mut decoder = ReaderBuilder::new(schema)
2467 .with_ignore_type_conflicts(true)
2468 .build_decoder()
2469 .unwrap();
2470 decoder.serialize(&json).unwrap();
2471 let batch = decoder.flush().unwrap().unwrap();
2472
2473 let col = batch.column(0).as_fixed_size_list();
2474 assert_eq!(col.len(), 4);
2475 assert!(col.is_valid(0));
2476 assert!(col.is_null(1)); assert!(col.is_null(2)); assert!(col.is_valid(3));
2479
2480 let values = col.values().as_primitive::<Int32Type>();
2481 assert_eq!(values.value(0), 1);
2482 assert_eq!(values.value(1), 2);
2483 assert_eq!(values.value(6), 6);
2484 assert_eq!(values.value(7), 7);
2485 }
2486
2487 #[test]
2488 fn test_skip_empty_lines() {
2489 let schema = Schema::new(vec![Field::new("a", DataType::Int64, true)]);
2490 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(64);
2491 let json_content = "
2492 {\"a\": 1}
2493 {\"a\": 2}
2494 {\"a\": 3}";
2495 let mut reader = builder.build(Cursor::new(json_content)).unwrap();
2496 let batch = reader.next().unwrap().unwrap();
2497
2498 assert_eq!(1, batch.num_columns());
2499 assert_eq!(3, batch.num_rows());
2500
2501 let schema = reader.schema();
2502 let c = schema.column_with_name("a").unwrap();
2503 assert_eq!(&DataType::Int64, c.1.data_type());
2504 }
2505
2506 #[test]
2507 fn test_with_multiple_batches() {
2508 let file = File::open("test/data/basic_nulls.json").unwrap();
2509 let mut reader = BufReader::new(file);
2510 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2511 reader.rewind().unwrap();
2512
2513 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2514 let mut reader = builder.build(reader).unwrap();
2515
2516 let mut num_records = Vec::new();
2517 while let Some(rb) = reader.next().transpose().unwrap() {
2518 num_records.push(rb.num_rows());
2519 }
2520
2521 assert_eq!(vec![5, 5, 2], num_records);
2522 }
2523
2524 #[test]
2525 fn test_timestamp_from_json_seconds() {
2526 let schema = Schema::new(vec![Field::new(
2527 "a",
2528 DataType::Timestamp(TimeUnit::Second, None),
2529 true,
2530 )]);
2531
2532 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2533 let batch = reader.next().unwrap().unwrap();
2534
2535 assert_eq!(1, batch.num_columns());
2536 assert_eq!(12, batch.num_rows());
2537
2538 let schema = reader.schema();
2539 let batch_schema = batch.schema();
2540 assert_eq!(schema, batch_schema);
2541
2542 let a = schema.column_with_name("a").unwrap();
2543 assert_eq!(
2544 &DataType::Timestamp(TimeUnit::Second, None),
2545 a.1.data_type()
2546 );
2547
2548 let aa = batch.column(a.0).as_primitive::<TimestampSecondType>();
2549 assert!(aa.is_valid(0));
2550 assert!(!aa.is_valid(1));
2551 assert!(!aa.is_valid(2));
2552 assert_eq!(1, aa.value(0));
2553 assert_eq!(1, aa.value(3));
2554 assert_eq!(5, aa.value(7));
2555 }
2556
2557 #[test]
2558 fn test_timestamp_from_json_milliseconds() {
2559 let schema = Schema::new(vec![Field::new(
2560 "a",
2561 DataType::Timestamp(TimeUnit::Millisecond, None),
2562 true,
2563 )]);
2564
2565 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2566 let batch = reader.next().unwrap().unwrap();
2567
2568 assert_eq!(1, batch.num_columns());
2569 assert_eq!(12, batch.num_rows());
2570
2571 let schema = reader.schema();
2572 let batch_schema = batch.schema();
2573 assert_eq!(schema, batch_schema);
2574
2575 let a = schema.column_with_name("a").unwrap();
2576 assert_eq!(
2577 &DataType::Timestamp(TimeUnit::Millisecond, None),
2578 a.1.data_type()
2579 );
2580
2581 let aa = batch.column(a.0).as_primitive::<TimestampMillisecondType>();
2582 assert!(aa.is_valid(0));
2583 assert!(!aa.is_valid(1));
2584 assert!(!aa.is_valid(2));
2585 assert_eq!(1, aa.value(0));
2586 assert_eq!(1, aa.value(3));
2587 assert_eq!(5, aa.value(7));
2588 }
2589
2590 #[test]
2591 fn test_date_from_json_milliseconds() {
2592 let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
2593
2594 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2595 let batch = reader.next().unwrap().unwrap();
2596
2597 assert_eq!(1, batch.num_columns());
2598 assert_eq!(12, batch.num_rows());
2599
2600 let schema = reader.schema();
2601 let batch_schema = batch.schema();
2602 assert_eq!(schema, batch_schema);
2603
2604 let a = schema.column_with_name("a").unwrap();
2605 assert_eq!(&DataType::Date64, a.1.data_type());
2606
2607 let aa = batch.column(a.0).as_primitive::<Date64Type>();
2608 assert!(aa.is_valid(0));
2609 assert!(!aa.is_valid(1));
2610 assert!(!aa.is_valid(2));
2611 assert_eq!(1, aa.value(0));
2612 assert_eq!(1, aa.value(3));
2613 assert_eq!(5, aa.value(7));
2614 }
2615
2616 #[test]
2617 fn test_time_from_json_nanoseconds() {
2618 let schema = Schema::new(vec![Field::new(
2619 "a",
2620 DataType::Time64(TimeUnit::Nanosecond),
2621 true,
2622 )]);
2623
2624 let mut reader = read_file("test/data/basic_nulls.json", Some(schema));
2625 let batch = reader.next().unwrap().unwrap();
2626
2627 assert_eq!(1, batch.num_columns());
2628 assert_eq!(12, batch.num_rows());
2629
2630 let schema = reader.schema();
2631 let batch_schema = batch.schema();
2632 assert_eq!(schema, batch_schema);
2633
2634 let a = schema.column_with_name("a").unwrap();
2635 assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
2636
2637 let aa = batch.column(a.0).as_primitive::<Time64NanosecondType>();
2638 assert!(aa.is_valid(0));
2639 assert!(!aa.is_valid(1));
2640 assert!(!aa.is_valid(2));
2641 assert_eq!(1, aa.value(0));
2642 assert_eq!(1, aa.value(3));
2643 assert_eq!(5, aa.value(7));
2644 }
2645
2646 #[test]
2647 fn test_json_iterator() {
2648 let file = File::open("test/data/basic.json").unwrap();
2649 let mut reader = BufReader::new(file);
2650 let (schema, _) = infer_json_schema(&mut reader, None).unwrap();
2651 reader.rewind().unwrap();
2652
2653 let builder = ReaderBuilder::new(Arc::new(schema)).with_batch_size(5);
2654 let reader = builder.build(reader).unwrap();
2655 let schema = reader.schema();
2656 let (col_a_index, _) = schema.column_with_name("a").unwrap();
2657
2658 let mut sum_num_rows = 0;
2659 let mut num_batches = 0;
2660 let mut sum_a = 0;
2661 for batch in reader {
2662 let batch = batch.unwrap();
2663 assert_eq!(8, batch.num_columns());
2664 sum_num_rows += batch.num_rows();
2665 num_batches += 1;
2666 let batch_schema = batch.schema();
2667 assert_eq!(schema, batch_schema);
2668 let a_array = batch.column(col_a_index).as_primitive::<Int64Type>();
2669 sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
2670 }
2671 assert_eq!(12, sum_num_rows);
2672 assert_eq!(3, num_batches);
2673 assert_eq!(100000000000011, sum_a);
2674 }
2675
2676 #[test]
2677 fn test_decoder_error() {
2678 let schema = Arc::new(Schema::new(vec![Field::new_struct(
2679 "a",
2680 vec![Field::new("child", DataType::Int32, false)],
2681 true,
2682 )]));
2683
2684 let mut decoder = ReaderBuilder::new(schema.clone()).build_decoder().unwrap();
2685 let _ = decoder.decode(r#"{"a": { "child":"#.as_bytes()).unwrap();
2686 assert!(decoder.tape_decoder.has_partial_row());
2687 assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2688 let _ = decoder.flush().unwrap_err();
2689 assert!(decoder.tape_decoder.has_partial_row());
2690 assert_eq!(decoder.tape_decoder.num_buffered_rows(), 1);
2691
2692 let parse_err = |s: &str| {
2693 ReaderBuilder::new(schema.clone())
2694 .build(Cursor::new(s.as_bytes()))
2695 .unwrap()
2696 .next()
2697 .unwrap()
2698 .unwrap_err()
2699 .to_string()
2700 };
2701
2702 let err = parse_err(r#"{"a": 123}"#);
2703 assert_eq!(
2704 err,
2705 "Json error: whilst decoding field 'a': expected { got 123"
2706 );
2707
2708 let err = parse_err(r#"{"a": ["bar"]}"#);
2709 assert_eq!(
2710 err,
2711 r#"Json error: whilst decoding field 'a': expected { got ["bar"]"#
2712 );
2713
2714 let err = parse_err(r#"{"a": []}"#);
2715 assert_eq!(
2716 err,
2717 "Json error: whilst decoding field 'a': expected { got []"
2718 );
2719
2720 let err = parse_err(r#"{"a": [{"child": 234}]}"#);
2721 assert_eq!(
2722 err,
2723 r#"Json error: whilst decoding field 'a': expected { got [{"child": 234}]"#
2724 );
2725
2726 let err = parse_err(r#"{"a": [{"child": {"foo": [{"foo": ["bar"]}]}}]}"#);
2727 assert_eq!(
2728 err,
2729 r#"Json error: whilst decoding field 'a': expected { got [{"child": {"foo": [{"foo": ["bar"]}]}}]"#
2730 );
2731
2732 let err = parse_err(r#"{"a": true}"#);
2733 assert_eq!(
2734 err,
2735 "Json error: whilst decoding field 'a': expected { got true"
2736 );
2737
2738 let err = parse_err(r#"{"a": false}"#);
2739 assert_eq!(
2740 err,
2741 "Json error: whilst decoding field 'a': expected { got false"
2742 );
2743
2744 let err = parse_err(r#"{"a": "foo"}"#);
2745 assert_eq!(
2746 err,
2747 "Json error: whilst decoding field 'a': expected { got \"foo\""
2748 );
2749
2750 let err = parse_err(r#"{"a": {"child": false}}"#);
2751 assert_eq!(
2752 err,
2753 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got false"
2754 );
2755
2756 let err = parse_err(r#"{"a": {"child": []}}"#);
2757 assert_eq!(
2758 err,
2759 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got []"
2760 );
2761
2762 let err = parse_err(r#"{"a": {"child": [123]}}"#);
2763 assert_eq!(
2764 err,
2765 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123]"
2766 );
2767
2768 let err = parse_err(r#"{"a": {"child": [123, 3465346]}}"#);
2769 assert_eq!(
2770 err,
2771 "Json error: whilst decoding field 'a': whilst decoding field 'child': expected primitive got [123, 3465346]"
2772 );
2773 }
2774
2775 #[test]
2776 fn test_serialize_timestamp() {
2777 let json = vec![
2778 json!({"timestamp": 1681319393}),
2779 json!({"timestamp": "1970-01-01T00:00:00+02:00"}),
2780 ];
2781 let schema = Schema::new(vec![Field::new(
2782 "timestamp",
2783 DataType::Timestamp(TimeUnit::Second, None),
2784 true,
2785 )]);
2786 let mut decoder = ReaderBuilder::new(Arc::new(schema))
2787 .build_decoder()
2788 .unwrap();
2789 decoder.serialize(&json).unwrap();
2790 let batch = decoder.flush().unwrap().unwrap();
2791 assert_eq!(batch.num_rows(), 2);
2792 assert_eq!(batch.num_columns(), 1);
2793 let values = batch.column(0).as_primitive::<TimestampSecondType>();
2794 assert_eq!(values.values(), &[1681319393, -7200]);
2795 }
2796
2797 #[test]
2798 fn test_serialize_decimal() {
2799 let json = vec![
2800 json!({"decimal": 1.234}),
2801 json!({"decimal": "1.234"}),
2802 json!({"decimal": 1234}),
2803 json!({"decimal": "1234"}),
2804 ];
2805 let schema = Schema::new(vec![Field::new(
2806 "decimal",
2807 DataType::Decimal128(10, 3),
2808 true,
2809 )]);
2810 let mut decoder = ReaderBuilder::new(Arc::new(schema))
2811 .build_decoder()
2812 .unwrap();
2813 decoder.serialize(&json).unwrap();
2814 let batch = decoder.flush().unwrap().unwrap();
2815 assert_eq!(batch.num_rows(), 4);
2816 assert_eq!(batch.num_columns(), 1);
2817 let values = batch.column(0).as_primitive::<Decimal128Type>();
2818 assert_eq!(values.values(), &[1234, 1234, 1234000, 1234000]);
2819 }
2820
2821 #[test]
2822 fn test_serde_field() {
2823 let field = Field::new("int", DataType::Int32, true);
2824 let mut decoder = ReaderBuilder::new_with_field(field)
2825 .build_decoder()
2826 .unwrap();
2827 decoder.serialize(&[1_i32, 2, 3, 4]).unwrap();
2828 let b = decoder.flush().unwrap().unwrap();
2829 let values = b.column(0).as_primitive::<Int32Type>().values();
2830 assert_eq!(values, &[1, 2, 3, 4]);
2831 }
2832
2833 #[test]
2834 fn test_serde_large_numbers() {
2835 let field = Field::new("int", DataType::Int64, true);
2836 let mut decoder = ReaderBuilder::new_with_field(field)
2837 .build_decoder()
2838 .unwrap();
2839
2840 decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2841 let b = decoder.flush().unwrap().unwrap();
2842 let values = b.column(0).as_primitive::<Int64Type>().values();
2843 assert_eq!(values, &[1699148028689, 2, 3, 4]);
2844
2845 let field = Field::new(
2846 "int",
2847 DataType::Timestamp(TimeUnit::Microsecond, None),
2848 true,
2849 );
2850 let mut decoder = ReaderBuilder::new_with_field(field)
2851 .build_decoder()
2852 .unwrap();
2853
2854 decoder.serialize(&[1699148028689_u64, 2, 3, 4]).unwrap();
2855 let b = decoder.flush().unwrap().unwrap();
2856 let values = b
2857 .column(0)
2858 .as_primitive::<TimestampMicrosecondType>()
2859 .values();
2860 assert_eq!(values, &[1699148028689, 2, 3, 4]);
2861 }
2862
2863 #[test]
2864 fn test_coercing_primitive_into_string_decoder() {
2865 let buf = &format!(
2866 r#"[{{"a": 1, "b": "A", "c": "T"}}, {{"a": 2, "b": "BB", "c": "F"}}, {{"a": {}, "b": 123, "c": false}}, {{"a": {}, "b": 789, "c": true}}]"#,
2867 (i32::MAX as i64 + 10),
2868 i64::MAX - 10
2869 );
2870 let schema = Schema::new(vec![
2871 Field::new("a", DataType::Float64, true),
2872 Field::new("b", DataType::Utf8, true),
2873 Field::new("c", DataType::Utf8, true),
2874 ]);
2875 let json_array: Vec<serde_json::Value> = serde_json::from_str(buf).unwrap();
2876 let schema_ref = Arc::new(schema);
2877
2878 let reader = ReaderBuilder::new(schema_ref.clone()).with_coerce_primitive(true);
2880 let mut decoder = reader.build_decoder().unwrap();
2881 decoder.serialize(json_array.as_slice()).unwrap();
2882 let batch = decoder.flush().unwrap().unwrap();
2883 assert_eq!(
2884 batch,
2885 RecordBatch::try_new(
2886 schema_ref,
2887 vec![
2888 Arc::new(Float64Array::from(vec![
2889 1.0,
2890 2.0,
2891 (i32::MAX as i64 + 10) as f64,
2892 (i64::MAX - 10) as f64
2893 ])),
2894 Arc::new(StringArray::from(vec!["A", "BB", "123", "789"])),
2895 Arc::new(StringArray::from(vec!["T", "F", "false", "true"])),
2896 ]
2897 )
2898 .unwrap()
2899 );
2900 }
2901
2902 fn _parse_structs(
2907 row: &str,
2908 struct_mode: StructMode,
2909 fields: Fields,
2910 as_struct: bool,
2911 ) -> Result<RecordBatch, ArrowError> {
2912 let builder = if as_struct {
2913 ReaderBuilder::new_with_field(Field::new("r", DataType::Struct(fields), true))
2914 } else {
2915 ReaderBuilder::new(Arc::new(Schema::new(fields)))
2916 };
2917 builder
2918 .with_struct_mode(struct_mode)
2919 .build(Cursor::new(row.as_bytes()))
2920 .unwrap()
2921 .next()
2922 .unwrap()
2923 }
2924
2925 #[test]
2926 fn test_struct_decoding_list_length() {
2927 use arrow_array::array;
2928
2929 let row = "[1, 2]";
2930
2931 let mut fields = vec![Field::new("a", DataType::Int32, true)];
2932 let too_few_fields = Fields::from(fields.clone());
2933 fields.push(Field::new("b", DataType::Int32, true));
2934 let correct_fields = Fields::from(fields.clone());
2935 fields.push(Field::new("c", DataType::Int32, true));
2936 let too_many_fields = Fields::from(fields.clone());
2937
2938 let parse = |fields: Fields, as_struct: bool| {
2939 _parse_structs(row, StructMode::ListOnly, fields, as_struct)
2940 };
2941
2942 let expected_row = StructArray::new(
2943 correct_fields.clone(),
2944 vec![
2945 Arc::new(array::Int32Array::from(vec![1])),
2946 Arc::new(array::Int32Array::from(vec![2])),
2947 ],
2948 None,
2949 );
2950 let row_field = Field::new("r", DataType::Struct(correct_fields.clone()), true);
2951
2952 assert_eq!(
2953 parse(too_few_fields.clone(), true).unwrap_err().to_string(),
2954 "Json error: found extra columns for 1 fields".to_string()
2955 );
2956 assert_eq!(
2957 parse(too_few_fields, false).unwrap_err().to_string(),
2958 "Json error: found extra columns for 1 fields".to_string()
2959 );
2960 assert_eq!(
2961 parse(correct_fields.clone(), true).unwrap(),
2962 RecordBatch::try_new(
2963 Arc::new(Schema::new(vec![row_field])),
2964 vec![Arc::new(expected_row.clone())]
2965 )
2966 .unwrap()
2967 );
2968 assert_eq!(
2969 parse(correct_fields, false).unwrap(),
2970 RecordBatch::from(expected_row)
2971 );
2972 assert_eq!(
2973 parse(too_many_fields.clone(), true)
2974 .unwrap_err()
2975 .to_string(),
2976 "Json error: found 2 columns for 3 fields".to_string()
2977 );
2978 assert_eq!(
2979 parse(too_many_fields, false).unwrap_err().to_string(),
2980 "Json error: found 2 columns for 3 fields".to_string()
2981 );
2982 }
2983
2984 #[test]
2985 fn test_struct_decoding() {
2986 use arrow_array::builder;
2987
2988 let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
2989 let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
2990 let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;
2991
2992 let struct_fields = Fields::from(vec![
2993 Field::new("b", DataType::new_list(DataType::Int32, true), true),
2994 Field::new_map(
2995 "c",
2996 "entries",
2997 Field::new("keys", DataType::Utf8, false),
2998 Field::new("values", DataType::Int32, true),
2999 false,
3000 false,
3001 ),
3002 ]);
3003
3004 let list_array =
3005 ListArray::from_iter_primitive::<Int32Type, _, _>(vec![Some(vec![Some(1), Some(2)])]);
3006
3007 let map_array = {
3008 let mut map_builder = builder::MapBuilder::new(
3009 None,
3010 builder::StringBuilder::new(),
3011 builder::Int32Builder::new(),
3012 );
3013 map_builder.keys().append_value("d");
3014 map_builder.values().append_value(3);
3015 map_builder.append(true).unwrap();
3016 map_builder.finish()
3017 };
3018
3019 let struct_array = StructArray::new(
3020 struct_fields.clone(),
3021 vec![Arc::new(list_array), Arc::new(map_array)],
3022 None,
3023 );
3024
3025 let fields = Fields::from(vec![Field::new("a", DataType::Struct(struct_fields), true)]);
3026 let schema = Arc::new(Schema::new(fields.clone()));
3027 let expected = RecordBatch::try_new(schema.clone(), vec![Arc::new(struct_array)]).unwrap();
3028
3029 let parse = |row: &str, struct_mode: StructMode| {
3030 _parse_structs(row, struct_mode, fields.clone(), false)
3031 };
3032
3033 assert_eq!(
3034 parse(nested_object_json, StructMode::ObjectOnly).unwrap(),
3035 expected
3036 );
3037 assert_eq!(
3038 parse(nested_list_json, StructMode::ObjectOnly)
3039 .unwrap_err()
3040 .to_string(),
3041 "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
3042 );
3043 assert_eq!(
3044 parse(nested_mixed_json, StructMode::ObjectOnly)
3045 .unwrap_err()
3046 .to_string(),
3047 "Json error: whilst decoding field 'a': expected { got [[1, 2], {\"d\": 3}]".to_owned()
3048 );
3049
3050 assert_eq!(
3051 parse(nested_list_json, StructMode::ListOnly).unwrap(),
3052 expected
3053 );
3054 assert_eq!(
3055 parse(nested_object_json, StructMode::ListOnly)
3056 .unwrap_err()
3057 .to_string(),
3058 "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\": 3}}}".to_owned()
3059 );
3060 assert_eq!(
3061 parse(nested_mixed_json, StructMode::ListOnly)
3062 .unwrap_err()
3063 .to_string(),
3064 "Json error: expected [ got {\"a\": [[1, 2], {\"d\": 3}]}".to_owned()
3065 );
3066 }
3067
3068 #[test]
3074 fn test_struct_decoding_empty_list() {
3075 let int_field = Field::new("a", DataType::Int32, true);
3076 let struct_field = Field::new(
3077 "r",
3078 DataType::Struct(Fields::from(vec![int_field.clone()])),
3079 true,
3080 );
3081
3082 let parse = |row: &str, as_struct: bool, field: Field| {
3083 _parse_structs(
3084 row,
3085 StructMode::ListOnly,
3086 Fields::from(vec![field]),
3087 as_struct,
3088 )
3089 };
3090
3091 assert_eq!(
3093 parse("[]", true, struct_field.clone())
3094 .unwrap_err()
3095 .to_string(),
3096 "Json error: found 0 columns for 1 fields".to_owned()
3097 );
3098 assert_eq!(
3099 parse("[]", false, int_field.clone())
3100 .unwrap_err()
3101 .to_string(),
3102 "Json error: found 0 columns for 1 fields".to_owned()
3103 );
3104 assert_eq!(
3105 parse("[]", false, struct_field.clone())
3106 .unwrap_err()
3107 .to_string(),
3108 "Json error: found 0 columns for 1 fields".to_owned()
3109 );
3110 assert_eq!(
3111 parse("[[]]", false, struct_field.clone())
3112 .unwrap_err()
3113 .to_string(),
3114 "Json error: whilst decoding field 'r': found 0 columns for 1 fields".to_owned()
3115 );
3116 }
3117
3118 #[test]
3119 fn test_decode_list_struct_with_wrong_types() {
3120 let int_field = Field::new("a", DataType::Int32, true);
3121 let struct_field = Field::new(
3122 "r",
3123 DataType::Struct(Fields::from(vec![int_field.clone()])),
3124 true,
3125 );
3126
3127 let parse = |row: &str, as_struct: bool, field: Field| {
3128 _parse_structs(
3129 row,
3130 StructMode::ListOnly,
3131 Fields::from(vec![field]),
3132 as_struct,
3133 )
3134 };
3135
3136 assert_eq!(
3138 parse(r#"[["a"]]"#, false, struct_field.clone())
3139 .unwrap_err()
3140 .to_string(),
3141 "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
3142 );
3143 assert_eq!(
3144 parse(r#"[["a"]]"#, true, struct_field.clone())
3145 .unwrap_err()
3146 .to_string(),
3147 "Json error: whilst decoding field 'r': whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
3148 );
3149 assert_eq!(
3150 parse(r#"["a"]"#, true, int_field.clone())
3151 .unwrap_err()
3152 .to_string(),
3153 "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
3154 );
3155 assert_eq!(
3156 parse(r#"["a"]"#, false, int_field.clone())
3157 .unwrap_err()
3158 .to_string(),
3159 "Json error: whilst decoding field 'a': failed to parse \"a\" as Int32".to_owned()
3160 );
3161 }
3162
3163 #[test]
3164 fn test_type_conflict_nulls() {
3165 let schema = Schema::new(vec![
3166 Field::new("null", DataType::Null, true),
3167 Field::new("bool", DataType::Boolean, true),
3168 Field::new("primitive", DataType::Int32, true),
3169 Field::new("numeric", DataType::Decimal128(10, 3), true),
3170 Field::new("string", DataType::Utf8, true),
3171 Field::new("string_view", DataType::Utf8View, true),
3172 Field::new(
3173 "timestamp",
3174 DataType::Timestamp(TimeUnit::Second, None),
3175 true,
3176 ),
3177 Field::new(
3178 "array",
3179 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
3180 true,
3181 ),
3182 Field::new(
3183 "map",
3184 DataType::Map(
3185 Arc::new(Field::new(
3186 "entries",
3187 DataType::Struct(Fields::from(vec![
3188 Field::new("keys", DataType::Utf8, false),
3189 Field::new("values", DataType::Utf8, true),
3190 ])),
3191 false, )),
3193 false, ),
3195 true, ),
3197 Field::new(
3198 "struct",
3199 DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])),
3200 true,
3201 ),
3202 ]);
3203
3204 let json_values = vec![
3206 json!(null),
3207 json!(true),
3208 json!(42),
3209 json!(1.234),
3210 json!("hi"),
3211 json!("ho"),
3212 json!("1970-01-01T00:00:00+02:00"),
3213 json!([1, "ho", 3]),
3214 json!({"k": "value"}),
3215 json!({"a": 1}),
3216 ];
3217
3218 let json: Vec<_> = (0..json_values.len())
3220 .map(|i| {
3221 let pairs = json_values[i..]
3222 .iter()
3223 .chain(json_values[..i].iter())
3224 .zip(&schema.fields)
3225 .map(|(v, f)| (f.name().to_string(), v.clone()))
3226 .collect();
3227 serde_json::Value::Object(pairs)
3228 })
3229 .collect();
3230 let mut decoder = ReaderBuilder::new(Arc::new(schema))
3231 .with_ignore_type_conflicts(true)
3232 .with_coerce_primitive(true)
3233 .build_decoder()
3234 .unwrap();
3235 decoder.serialize(&json).unwrap();
3236 let batch = decoder.flush().unwrap().unwrap();
3237 assert_eq!(batch.num_rows(), 10);
3238 assert_eq!(batch.num_columns(), 10);
3239
3240 let _ = batch
3242 .column(0)
3243 .as_any()
3244 .downcast_ref::<NullArray>()
3245 .unwrap();
3246
3247 assert!(
3248 batch
3249 .column(1)
3250 .as_any()
3251 .downcast_ref::<BooleanArray>()
3252 .unwrap()
3253 .iter()
3254 .eq([
3255 Some(true),
3256 None,
3257 None,
3258 None,
3259 None,
3260 None,
3261 None,
3262 None,
3263 None,
3264 None
3265 ])
3266 );
3267
3268 assert!(batch.column(2).as_primitive::<Int32Type>().iter().eq([
3269 Some(42),
3270 Some(1),
3271 None,
3272 None,
3273 None,
3274 None,
3275 None,
3276 None,
3277 None,
3278 None
3279 ]));
3280
3281 assert!(batch.column(3).as_primitive::<Decimal128Type>().iter().eq([
3282 Some(1234),
3283 None,
3284 None,
3285 None,
3286 None,
3287 None,
3288 None,
3289 None,
3290 None,
3291 Some(42000)
3292 ]));
3293
3294 assert!(
3295 batch
3296 .column(4)
3297 .as_any()
3298 .downcast_ref::<StringArray>()
3299 .unwrap()
3300 .iter()
3301 .eq([
3302 Some("hi"),
3303 Some("ho"),
3304 Some("1970-01-01T00:00:00+02:00"),
3305 None,
3306 None,
3307 None,
3308 None,
3309 Some("true"),
3310 Some("42"),
3311 Some("1.234"),
3312 ])
3313 );
3314
3315 assert!(
3316 batch
3317 .column(5)
3318 .as_any()
3319 .downcast_ref::<StringViewArray>()
3320 .unwrap()
3321 .iter()
3322 .eq([
3323 Some("ho"),
3324 Some("1970-01-01T00:00:00+02:00"),
3325 None,
3326 None,
3327 None,
3328 None,
3329 Some("true"),
3330 Some("42"),
3331 Some("1.234"),
3332 Some("hi"),
3333 ])
3334 );
3335
3336 assert!(
3337 batch
3338 .column(6)
3339 .as_primitive::<TimestampSecondType>()
3340 .iter()
3341 .eq([
3342 Some(-7200),
3343 None,
3344 None,
3345 None,
3346 None,
3347 None,
3348 Some(42),
3349 None,
3350 None,
3351 None,
3352 ])
3353 );
3354
3355 let arrays = batch
3356 .column(7)
3357 .as_any()
3358 .downcast_ref::<ListArray>()
3359 .unwrap();
3360 assert_eq!(
3361 arrays.nulls(),
3362 Some(&NullBuffer::from(
3363 &[
3364 true, false, false, false, false, false, false, false, false, false
3365 ][..]
3366 ))
3367 );
3368 assert_eq!(arrays.offsets()[1], 3);
3369 let array_values = arrays
3370 .values()
3371 .as_any()
3372 .downcast_ref::<Int32Array>()
3373 .unwrap();
3374 assert!(array_values.iter().eq([Some(1), None, Some(3)]));
3375
3376 let maps = batch.column(8).as_any().downcast_ref::<MapArray>().unwrap();
3377 assert_eq!(
3378 maps.nulls(),
3379 Some(&NullBuffer::from(
3380 &[
3382 true, true, false, false, false, false, false, false, false, false
3383 ][..]
3384 ))
3385 );
3386 let map_keys = maps.keys().as_any().downcast_ref::<StringArray>().unwrap();
3387 assert!(map_keys.iter().eq([Some("k"), Some("a")]));
3388 let map_values = maps
3389 .values()
3390 .as_any()
3391 .downcast_ref::<StringArray>()
3392 .unwrap();
3393 assert!(map_values.iter().eq([Some("value"), Some("1")]));
3394
3395 let structs = batch
3396 .column(9)
3397 .as_any()
3398 .downcast_ref::<StructArray>()
3399 .unwrap();
3400 assert_eq!(
3401 structs.nulls(),
3402 Some(&NullBuffer::from(
3403 &[
3405 true, false, false, false, false, false, false, false, false, true
3406 ][..]
3407 ))
3408 );
3409 let struct_fields = structs
3410 .column(0)
3411 .as_any()
3412 .downcast_ref::<Int32Array>()
3413 .unwrap();
3414 assert!(struct_fields.slice(0, 2).iter().eq([Some(1), None]));
3415 }
3416
3417 #[test]
3418 fn test_type_conflict_non_nullable() {
3419 let fields = [
3420 Field::new("bool", DataType::Boolean, false),
3421 Field::new("primitive", DataType::Int32, false),
3422 Field::new("numeric", DataType::Decimal128(10, 3), false),
3423 Field::new("string", DataType::Utf8, false),
3424 Field::new("string_view", DataType::Utf8View, false),
3425 Field::new(
3426 "timestamp",
3427 DataType::Timestamp(TimeUnit::Second, None),
3428 false,
3429 ),
3430 Field::new(
3431 "array",
3432 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
3433 false,
3434 ),
3435 Field::new(
3436 "fixed_size_list",
3437 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 2),
3438 false,
3439 ),
3440 Field::new(
3441 "map",
3442 DataType::Map(
3443 Arc::new(Field::new(
3444 "entries",
3445 DataType::Struct(Fields::from(vec![
3446 Field::new("keys", DataType::Utf8, false),
3447 Field::new("values", DataType::Utf8, true),
3448 ])),
3449 false, )),
3451 false, ),
3453 false, ),
3455 Field::new(
3456 "struct",
3457 DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])),
3458 false,
3459 ),
3460 ];
3461
3462 let json_values = vec![json!(true), json!({"a": 1})];
3464
3465 for field in fields {
3466 let mut decoder = ReaderBuilder::new_with_field(field)
3467 .with_ignore_type_conflicts(true)
3468 .build_decoder()
3469 .unwrap();
3470 decoder.serialize(&json_values).unwrap();
3471 decoder
3472 .flush()
3473 .expect_err("type conflict on non-nullable type");
3474 }
3475 }
3476
3477 #[test]
3478 fn test_ignore_type_conflicts_disabled() {
3479 let fields = [
3480 Field::new("null", DataType::Null, true),
3481 Field::new("bool", DataType::Boolean, true),
3482 Field::new("primitive", DataType::Int32, true),
3483 Field::new("numeric", DataType::Decimal128(10, 3), true),
3484 Field::new("string", DataType::Utf8, true),
3485 Field::new("string_view", DataType::Utf8View, true),
3486 Field::new(
3487 "timestamp",
3488 DataType::Timestamp(TimeUnit::Second, None),
3489 true,
3490 ),
3491 Field::new(
3492 "array",
3493 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
3494 true,
3495 ),
3496 Field::new(
3497 "fixed_size_list",
3498 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 2),
3499 true,
3500 ),
3501 Field::new(
3502 "map",
3503 DataType::Map(
3504 Arc::new(Field::new(
3505 "entries",
3506 DataType::Struct(Fields::from(vec![
3507 Field::new("keys", DataType::Utf8, false),
3508 Field::new("values", DataType::Utf8, true),
3509 ])),
3510 false, )),
3512 false, ),
3514 true, ),
3516 Field::new(
3517 "struct",
3518 DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, true)])),
3519 true,
3520 ),
3521 ];
3522
3523 let json_values = vec![json!(true), json!({"a": 1})];
3525
3526 for field in fields {
3527 let mut decoder = ReaderBuilder::new_with_field(field)
3528 .build_decoder()
3529 .unwrap();
3530 decoder.serialize(&json_values).unwrap();
3531 decoder
3532 .flush()
3533 .expect_err("type conflict on non-nullable type");
3534 }
3535 }
3536
3537 #[test]
3538 fn test_read_run_end_encoded() {
3539 let buf = r#"
3540 {"a": "x"}
3541 {"a": "x"}
3542 {"a": "y"}
3543 {"a": "y"}
3544 {"a": "y"}
3545 "#;
3546
3547 let ree_type = DataType::RunEndEncoded(
3548 Arc::new(Field::new("run_ends", DataType::Int32, false)),
3549 Arc::new(Field::new("values", DataType::Utf8, true)),
3550 );
3551 let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
3552 let batches = do_read(buf, 1024, false, false, schema);
3553 assert_eq!(batches.len(), 1);
3554
3555 let col = batches[0].column(0);
3556 let run_array = col.as_run::<arrow_array::types::Int32Type>();
3557
3558 assert_eq!(run_array.len(), 5);
3560 assert_eq!(run_array.run_ends().values(), &[2, 5]);
3561
3562 let values = run_array.values().as_string::<i32>();
3563 assert_eq!(values.len(), 2);
3564 assert_eq!(values.value(0), "x");
3565 assert_eq!(values.value(1), "y");
3566 }
3567
3568 #[test]
3569 fn test_read_run_end_encoded_consecutive_nulls() {
3570 let buf = r#"
3571 {"a": "x"}
3572 {}
3573 {}
3574 {}
3575 {"a": "y"}
3576 "#;
3577
3578 let ree_type = DataType::RunEndEncoded(
3579 Arc::new(Field::new("run_ends", DataType::Int32, false)),
3580 Arc::new(Field::new("values", DataType::Utf8, true)),
3581 );
3582 let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
3583 let batches = do_read(buf, 1024, false, false, schema);
3584 assert_eq!(batches.len(), 1);
3585
3586 let col = batches[0].column(0);
3587 let run_array = col.as_run::<arrow_array::types::Int32Type>();
3588
3589 assert_eq!(run_array.len(), 5);
3591 assert_eq!(run_array.run_ends().values(), &[1, 4, 5]);
3592
3593 let values = run_array.values().as_string::<i32>();
3594 assert_eq!(values.len(), 3);
3595 assert_eq!(values.value(0), "x");
3596 assert!(values.is_null(1));
3597 assert_eq!(values.value(2), "y");
3598 }
3599
3600 #[test]
3601 fn test_read_run_end_encoded_all_unique() {
3602 let buf = r#"
3603 {"a": 1}
3604 {"a": 2}
3605 {"a": 3}
3606 "#;
3607
3608 let ree_type = DataType::RunEndEncoded(
3609 Arc::new(Field::new("run_ends", DataType::Int32, false)),
3610 Arc::new(Field::new("values", DataType::Int32, true)),
3611 );
3612 let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
3613 let batches = do_read(buf, 1024, false, false, schema);
3614 assert_eq!(batches.len(), 1);
3615
3616 let col = batches[0].column(0);
3617 let run_array = col.as_run::<arrow_array::types::Int32Type>();
3618
3619 assert_eq!(run_array.len(), 3);
3621 assert_eq!(run_array.run_ends().values(), &[1, 2, 3]);
3622 }
3623
3624 #[test]
3625 fn test_read_run_end_encoded_int16_run_ends() {
3626 let buf = r#"
3627 {"a": "x"}
3628 {"a": "x"}
3629 {"a": "y"}
3630 "#;
3631
3632 let ree_type = DataType::RunEndEncoded(
3633 Arc::new(Field::new("run_ends", DataType::Int16, false)),
3634 Arc::new(Field::new("values", DataType::Utf8, true)),
3635 );
3636 let schema = Arc::new(Schema::new(vec![Field::new("a", ree_type, true)]));
3637 let batches = do_read(buf, 1024, false, false, schema);
3638 assert_eq!(batches.len(), 1);
3639
3640 let col = batches[0].column(0);
3641 let run_array = col.as_run::<arrow_array::types::Int16Type>();
3642
3643 assert_eq!(run_array.len(), 3);
3644 assert_eq!(run_array.run_ends().values(), &[2i16, 3]);
3645 }
3646
3647 #[test]
3648 fn test_read_nested_run_end_encoded() {
3649 let buf = r#"
3650 {"a": "x"}
3651 {"a": "x"}
3652 {"a": "y"}
3653 "#;
3654
3655 let inner_type = DataType::RunEndEncoded(
3658 Arc::new(Field::new("run_ends", DataType::Int64, false)),
3659 Arc::new(Field::new("values", DataType::Utf8, true)),
3660 );
3661 let outer_type = DataType::RunEndEncoded(
3662 Arc::new(Field::new("run_ends", DataType::Int64, false)),
3663 Arc::new(Field::new("values", inner_type, true)),
3664 );
3665 let schema = Arc::new(Schema::new(vec![Field::new("a", outer_type, true)]));
3666 let batches = do_read(buf, 1024, false, false, schema);
3667 assert_eq!(batches.len(), 1);
3668
3669 let col = batches[0].column(0);
3670 let outer = col.as_run::<arrow_array::types::Int64Type>();
3671 assert_eq!(outer.len(), 3);
3673 assert_eq!(outer.run_ends().values(), &[2, 3]);
3674
3675 let nested = outer.values().as_run::<arrow_array::types::Int64Type>();
3676 assert_eq!(nested.len(), 2);
3678 assert_eq!(nested.run_ends().values(), &[1, 2]);
3679
3680 let nested_values = nested.values().as_string::<i32>();
3681 assert_eq!(nested_values.len(), 2);
3682 assert_eq!(nested_values.value(0), "x");
3683 assert_eq!(nested_values.value(1), "y");
3684 }
3685}