1use std::{collections::HashMap, fmt, sync::Arc};
22
23use crate::basic::{ConvertedType, Repetition};
24use crate::errors::{ParquetError, Result};
25use crate::file::reader::{FileReader, RowGroupReader};
26use crate::record::{
27 api::{make_list, make_map, Field, Row},
28 triplet::TripletIter,
29};
30use crate::schema::types::{ColumnPath, SchemaDescPtr, SchemaDescriptor, Type, TypePtr};
31
32const DEFAULT_BATCH_SIZE: usize = 1024;
34
35pub struct TreeBuilder {
39 batch_size: usize,
41}
42
43impl Default for TreeBuilder {
44 fn default() -> Self {
45 Self::new()
46 }
47}
48
49impl TreeBuilder {
50 pub fn new() -> Self {
52 Self {
53 batch_size: DEFAULT_BATCH_SIZE,
54 }
55 }
56
57 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
59 self.batch_size = batch_size;
60 self
61 }
62
63 pub fn build(
65 &self,
66 descr: SchemaDescPtr,
67 row_group_reader: &dyn RowGroupReader,
68 ) -> Result<Reader> {
69 let mut paths: HashMap<ColumnPath, usize> = HashMap::new();
72 let row_group_metadata = row_group_reader.metadata();
73
74 for col_index in 0..row_group_reader.num_columns() {
75 let col_meta = row_group_metadata.column(col_index);
76 let col_path = col_meta.column_path().clone();
77 paths.insert(col_path, col_index);
78 }
79
80 let mut readers = Vec::new();
82 let mut path = Vec::new();
83
84 for field in descr.root_schema().get_fields() {
85 let reader =
86 self.reader_tree(field.clone(), &mut path, 0, 0, &paths, row_group_reader)?;
87 readers.push(reader);
88 }
89
90 Ok(Reader::GroupReader(None, 0, readers))
93 }
94
95 pub fn as_iter(
97 &self,
98 descr: SchemaDescPtr,
99 row_group_reader: &dyn RowGroupReader,
100 ) -> Result<ReaderIter> {
101 let num_records = row_group_reader.metadata().num_rows() as usize;
102 ReaderIter::new(self.build(descr, row_group_reader)?, num_records)
103 }
104
105 fn reader_tree(
107 &self,
108 field: TypePtr,
109 path: &mut Vec<String>,
110 mut curr_def_level: i16,
111 mut curr_rep_level: i16,
112 paths: &HashMap<ColumnPath, usize>,
113 row_group_reader: &dyn RowGroupReader,
114 ) -> Result<Reader> {
115 assert!(field.get_basic_info().has_repetition());
116 let repetition = field.get_basic_info().repetition();
118 match repetition {
119 Repetition::OPTIONAL => {
120 curr_def_level += 1;
121 }
122 Repetition::REPEATED => {
123 curr_def_level += 1;
124 curr_rep_level += 1;
125 }
126 _ => {}
127 }
128
129 path.push(String::from(field.name()));
130 let reader = if field.is_primitive() {
131 let col_path = ColumnPath::new(path.to_vec());
132 let orig_index = *paths
133 .get(&col_path)
134 .ok_or(general_err!("Path {:?} not found", col_path))?;
135 let col_descr = row_group_reader
136 .metadata()
137 .column(orig_index)
138 .column_descr_ptr();
139 let col_reader = row_group_reader.get_column_reader(orig_index)?;
140 let column = TripletIter::new(col_descr, col_reader, self.batch_size);
141 let reader = Reader::PrimitiveReader(field.clone(), Box::new(column));
142 if repetition == Repetition::REPEATED {
143 Reader::RepeatedReader(
144 field,
145 curr_def_level - 1,
146 curr_rep_level - 1,
147 Box::new(reader),
148 )
149 } else {
150 reader
151 }
152 } else {
153 match field.get_basic_info().converted_type() {
154 ConvertedType::LIST => {
156 assert_eq!(field.get_fields().len(), 1, "Invalid list type {field:?}");
157
158 let repeated_field = field.get_fields()[0].clone();
159 assert_eq!(
160 repeated_field.get_basic_info().repetition(),
161 Repetition::REPEATED,
162 "Invalid list type {field:?}"
163 );
164
165 if Reader::is_element_type(&repeated_field) {
166 let reader = self.reader_tree(
168 repeated_field,
169 path,
170 curr_def_level,
171 curr_rep_level,
172 paths,
173 row_group_reader,
174 )?;
175
176 Reader::RepeatedReader(
177 field,
178 curr_def_level,
179 curr_rep_level,
180 Box::new(reader),
181 )
182 } else {
183 let child_field = repeated_field.get_fields()[0].clone();
184
185 path.push(String::from(repeated_field.name()));
186
187 let reader = self.reader_tree(
188 child_field,
189 path,
190 curr_def_level + 1,
191 curr_rep_level + 1,
192 paths,
193 row_group_reader,
194 )?;
195
196 path.pop();
197
198 Reader::RepeatedReader(
199 field,
200 curr_def_level,
201 curr_rep_level,
202 Box::new(reader),
203 )
204 }
205 }
206 ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
208 assert_eq!(field.get_fields().len(), 1, "Invalid map type: {field:?}");
209 assert!(
210 !field.get_fields()[0].is_primitive(),
211 "Invalid map type: {field:?}"
212 );
213
214 let key_value_type = field.get_fields()[0].clone();
215 assert_eq!(
216 key_value_type.get_basic_info().repetition(),
217 Repetition::REPEATED,
218 "Invalid map type: {field:?}"
219 );
220 if key_value_type.get_fields().len() != 1 {
222 assert_eq!(
224 key_value_type.get_fields().len(),
225 2,
226 "Invalid map type: {field:?}"
227 );
228 }
229
230 path.push(String::from(key_value_type.name()));
231
232 let key_type = &key_value_type.get_fields()[0];
233 assert!(
234 key_type.is_primitive(),
235 "Map key type is expected to be a primitive type, but found {key_type:?}"
236 );
237 let key_reader = self.reader_tree(
238 key_type.clone(),
239 path,
240 curr_def_level + 1,
241 curr_rep_level + 1,
242 paths,
243 row_group_reader,
244 )?;
245
246 if key_value_type.get_fields().len() == 1 {
247 path.pop();
248 Reader::RepeatedReader(
249 field,
250 curr_def_level,
251 curr_rep_level,
252 Box::new(key_reader),
253 )
254 } else {
255 let value_type = &key_value_type.get_fields()[1];
256 let value_reader = self.reader_tree(
257 value_type.clone(),
258 path,
259 curr_def_level + 1,
260 curr_rep_level + 1,
261 paths,
262 row_group_reader,
263 )?;
264
265 path.pop();
266
267 Reader::KeyValueReader(
268 field,
269 curr_def_level,
270 curr_rep_level,
271 Box::new(key_reader),
272 Box::new(value_reader),
273 )
274 }
275 }
276 _ if repetition == Repetition::REPEATED => {
281 let required_field = Type::group_type_builder(field.name())
282 .with_repetition(Repetition::REQUIRED)
283 .with_converted_type(field.get_basic_info().converted_type())
284 .with_fields(field.get_fields().to_vec())
285 .build()?;
286
287 path.pop();
288
289 let reader = self.reader_tree(
290 Arc::new(required_field),
291 path,
292 curr_def_level,
293 curr_rep_level,
294 paths,
295 row_group_reader,
296 )?;
297
298 return Ok(Reader::RepeatedReader(
299 field,
300 curr_def_level - 1,
301 curr_rep_level - 1,
302 Box::new(reader),
303 ));
304 }
305 _ => {
307 let mut readers = Vec::new();
308 for child in field.get_fields() {
309 let reader = self.reader_tree(
310 child.clone(),
311 path,
312 curr_def_level,
313 curr_rep_level,
314 paths,
315 row_group_reader,
316 )?;
317 readers.push(reader);
318 }
319 Reader::GroupReader(Some(field), curr_def_level, readers)
320 }
321 }
322 };
323 path.pop();
324
325 Ok(Reader::option(repetition, curr_def_level, reader))
326 }
327}
328
329pub enum Reader {
331 PrimitiveReader(TypePtr, Box<TripletIter>),
333 OptionReader(i16, Box<Reader>),
335 GroupReader(Option<TypePtr>, i16, Vec<Reader>),
338 RepeatedReader(TypePtr, i16, i16, Box<Reader>),
341 KeyValueReader(TypePtr, i16, i16, Box<Reader>, Box<Reader>),
344}
345
346impl Reader {
347 fn option(repetition: Repetition, def_level: i16, reader: Reader) -> Self {
349 if repetition == Repetition::OPTIONAL {
350 Reader::OptionReader(def_level - 1, Box::new(reader))
351 } else {
352 reader
353 }
354 }
355
356 fn is_element_type(repeated_type: &Type) -> bool {
362 if repeated_type.is_list() || repeated_type.has_single_repeated_child() {
372 return false;
373 }
374
375 repeated_type.is_primitive() ||
383 repeated_type.is_group() && repeated_type.get_fields().len() > 1 ||
395 repeated_type.name() == "array" ||
406 repeated_type.name().ends_with("_tuple")
416 }
417
418 fn read(&mut self) -> Result<Row> {
423 match *self {
424 Reader::GroupReader(_, _, ref mut readers) => {
425 let mut fields = Vec::new();
426 for reader in readers {
427 fields.push((String::from(reader.field_name()), reader.read_field()?));
428 }
429 Ok(Row::new(fields))
430 }
431 _ => panic!("Cannot call read() on {self}"),
432 }
433 }
434
435 fn read_field(&mut self) -> Result<Field> {
438 let field = match *self {
439 Reader::PrimitiveReader(_, ref mut column) => {
440 let value = column.current_value()?;
441 column.read_next()?;
442 value
443 }
444 Reader::OptionReader(def_level, ref mut reader) => {
445 if reader.current_def_level() > def_level {
446 reader.read_field()?
447 } else {
448 reader.advance_columns()?;
449 Field::Null
450 }
451 }
452 Reader::GroupReader(_, def_level, ref mut readers) => {
453 let mut fields = Vec::new();
454 for reader in readers {
455 if reader.repetition() != Repetition::OPTIONAL
456 || reader.current_def_level() > def_level
457 {
458 fields.push((String::from(reader.field_name()), reader.read_field()?));
459 } else {
460 reader.advance_columns()?;
461 fields.push((String::from(reader.field_name()), Field::Null));
462 }
463 }
464 let row = Row::new(fields);
465 Field::Group(row)
466 }
467 Reader::RepeatedReader(_, def_level, rep_level, ref mut reader) => {
468 let mut elements = Vec::new();
469 loop {
470 if reader.current_def_level() > def_level {
471 elements.push(reader.read_field()?);
472 } else {
473 reader.advance_columns()?;
474 break;
479 }
480
481 if !reader.has_next() || reader.current_rep_level() <= rep_level {
485 break;
486 }
487 }
488 Field::ListInternal(make_list(elements))
489 }
490 Reader::KeyValueReader(_, def_level, rep_level, ref mut keys, ref mut values) => {
491 let mut pairs = Vec::new();
492 loop {
493 if keys.current_def_level() > def_level {
494 pairs.push((keys.read_field()?, values.read_field()?));
495 } else {
496 keys.advance_columns()?;
497 values.advance_columns()?;
498 break;
503 }
504
505 if !keys.has_next() || keys.current_rep_level() <= rep_level {
509 break;
510 }
511 }
512
513 Field::MapInternal(make_map(pairs))
514 }
515 };
516 Ok(field)
517 }
518
519 fn field_name(&self) -> &str {
521 match *self {
522 Reader::PrimitiveReader(ref field, _) => field.name(),
523 Reader::OptionReader(_, ref reader) => reader.field_name(),
524 Reader::GroupReader(ref opt, ..) => match opt {
525 Some(ref field) => field.name(),
526 None => panic!("Field is None for group reader"),
527 },
528 Reader::RepeatedReader(ref field, ..) => field.name(),
529 Reader::KeyValueReader(ref field, ..) => field.name(),
530 }
531 }
532
533 fn repetition(&self) -> Repetition {
535 match *self {
536 Reader::PrimitiveReader(ref field, _) => field.get_basic_info().repetition(),
537 Reader::OptionReader(_, ref reader) => reader.repetition(),
538 Reader::GroupReader(ref opt, ..) => match opt {
539 Some(ref field) => field.get_basic_info().repetition(),
540 None => panic!("Field is None for group reader"),
541 },
542 Reader::RepeatedReader(ref field, ..) => field.get_basic_info().repetition(),
543 Reader::KeyValueReader(ref field, ..) => field.get_basic_info().repetition(),
544 }
545 }
546
547 fn has_next(&self) -> bool {
550 match *self {
551 Reader::PrimitiveReader(_, ref column) => column.has_next(),
552 Reader::OptionReader(_, ref reader) => reader.has_next(),
553 Reader::GroupReader(_, _, ref readers) => readers.first().unwrap().has_next(),
554 Reader::RepeatedReader(_, _, _, ref reader) => reader.has_next(),
555 Reader::KeyValueReader(_, _, _, ref keys, _) => keys.has_next(),
556 }
557 }
558
559 fn current_def_level(&self) -> i16 {
562 match *self {
563 Reader::PrimitiveReader(_, ref column) => column.current_def_level(),
564 Reader::OptionReader(_, ref reader) => reader.current_def_level(),
565 Reader::GroupReader(_, _, ref readers) => match readers.first() {
566 Some(reader) => reader.current_def_level(),
567 None => panic!("Current definition level: empty group reader"),
568 },
569 Reader::RepeatedReader(_, _, _, ref reader) => reader.current_def_level(),
570 Reader::KeyValueReader(_, _, _, ref keys, _) => keys.current_def_level(),
571 }
572 }
573
574 fn current_rep_level(&self) -> i16 {
577 match *self {
578 Reader::PrimitiveReader(_, ref column) => column.current_rep_level(),
579 Reader::OptionReader(_, ref reader) => reader.current_rep_level(),
580 Reader::GroupReader(_, _, ref readers) => match readers.first() {
581 Some(reader) => reader.current_rep_level(),
582 None => panic!("Current repetition level: empty group reader"),
583 },
584 Reader::RepeatedReader(_, _, _, ref reader) => reader.current_rep_level(),
585 Reader::KeyValueReader(_, _, _, ref keys, _) => keys.current_rep_level(),
586 }
587 }
588
589 fn advance_columns(&mut self) -> Result<()> {
591 match *self {
592 Reader::PrimitiveReader(_, ref mut column) => column.read_next().map(|_| ()),
593 Reader::OptionReader(_, ref mut reader) => reader.advance_columns(),
594 Reader::GroupReader(_, _, ref mut readers) => {
595 for reader in readers {
596 reader.advance_columns()?;
597 }
598 Ok(())
599 }
600 Reader::RepeatedReader(_, _, _, ref mut reader) => reader.advance_columns(),
601 Reader::KeyValueReader(_, _, _, ref mut keys, ref mut values) => {
602 keys.advance_columns()?;
603 values.advance_columns()
604 }
605 }
606 }
607}
608
609impl fmt::Display for Reader {
610 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
611 let s = match self {
612 Reader::PrimitiveReader(..) => "PrimitiveReader",
613 Reader::OptionReader(..) => "OptionReader",
614 Reader::GroupReader(..) => "GroupReader",
615 Reader::RepeatedReader(..) => "RepeatedReader",
616 Reader::KeyValueReader(..) => "KeyValueReader",
617 };
618 write!(f, "{s}")
619 }
620}
621
622enum Either<'a> {
628 Left(&'a dyn FileReader),
629 Right(Box<dyn FileReader>),
630}
631
632impl Either<'_> {
633 fn reader(&self) -> &dyn FileReader {
634 match *self {
635 Either::Left(r) => r,
636 Either::Right(ref r) => &**r,
637 }
638 }
639}
640
641pub struct RowIter<'a> {
656 descr: SchemaDescPtr,
657 tree_builder: TreeBuilder,
658 file_reader: Option<Either<'a>>,
659 current_row_group: usize,
660 num_row_groups: usize,
661 row_iter: Option<ReaderIter>,
662}
663
664impl<'a> RowIter<'a> {
665 fn new(
667 file_reader: Option<Either<'a>>,
668 row_iter: Option<ReaderIter>,
669 descr: SchemaDescPtr,
670 ) -> Self {
671 let tree_builder = Self::tree_builder();
672 let num_row_groups = match file_reader {
673 Some(ref r) => r.reader().num_row_groups(),
674 None => 0,
675 };
676
677 Self {
678 descr,
679 file_reader,
680 tree_builder,
681 num_row_groups,
682 row_iter,
683 current_row_group: 0,
684 }
685 }
686
687 pub fn from_file(proj: Option<Type>, reader: &'a dyn FileReader) -> Result<Self> {
690 let either = Either::Left(reader);
691 let descr =
692 Self::get_proj_descr(proj, reader.metadata().file_metadata().schema_descr_ptr())?;
693
694 Ok(Self::new(Some(either), None, descr))
695 }
696
697 pub fn from_row_group(proj: Option<Type>, reader: &'a dyn RowGroupReader) -> Result<Self> {
699 let descr = Self::get_proj_descr(proj, reader.metadata().schema_descr_ptr())?;
700 let tree_builder = Self::tree_builder();
701 let row_iter = tree_builder.as_iter(descr.clone(), reader)?;
702
703 Ok(Self::new(None, Some(row_iter), descr))
706 }
707
708 pub fn from_file_into(reader: Box<dyn FileReader>) -> Self {
710 let either = Either::Right(reader);
711 let descr = either
712 .reader()
713 .metadata()
714 .file_metadata()
715 .schema_descr_ptr();
716
717 Self::new(Some(either), None, descr)
718 }
719
720 pub fn project(self, proj: Option<Type>) -> Result<Self> {
726 match self.file_reader {
727 Some(ref either) => {
728 let schema = either
729 .reader()
730 .metadata()
731 .file_metadata()
732 .schema_descr_ptr();
733 let descr = Self::get_proj_descr(proj, schema)?;
734
735 Ok(Self::new(self.file_reader, None, descr))
736 }
737 None => Err(general_err!("File reader is required to use projections")),
738 }
739 }
740
741 #[inline]
744 fn get_proj_descr(proj: Option<Type>, root_descr: SchemaDescPtr) -> Result<SchemaDescPtr> {
745 match proj {
746 Some(projection) => {
747 let root_schema = root_descr.root_schema();
749 if !root_schema.check_contains(&projection) {
750 return Err(general_err!("Root schema does not contain projection"));
751 }
752 Ok(Arc::new(SchemaDescriptor::new(Arc::new(projection))))
753 }
754 None => Ok(root_descr),
755 }
756 }
757
758 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
760 self.tree_builder = self.tree_builder.with_batch_size(batch_size);
761 self
762 }
763
764 #[inline]
767 fn tree_builder() -> TreeBuilder {
768 TreeBuilder::new()
769 }
770}
771
772impl Iterator for RowIter<'_> {
773 type Item = Result<Row>;
774
775 fn next(&mut self) -> Option<Result<Row>> {
776 let mut row = None;
777 if let Some(ref mut iter) = self.row_iter {
778 row = iter.next();
779 }
780
781 while row.is_none() && self.current_row_group < self.num_row_groups {
782 if let Some(ref either) = self.file_reader {
785 let file_reader = either.reader();
786 let row_group_reader = &*file_reader
787 .get_row_group(self.current_row_group)
788 .expect("Row group is required to advance");
789
790 match self
791 .tree_builder
792 .as_iter(self.descr.clone(), row_group_reader)
793 {
794 Ok(mut iter) => {
795 row = iter.next();
796
797 self.current_row_group += 1;
798 self.row_iter = Some(iter);
799 }
800 Err(e) => return Some(Err(e)),
801 }
802 }
803 }
804
805 row
806 }
807}
808
809pub struct ReaderIter {
811 root_reader: Reader,
812 records_left: usize,
813}
814
815impl ReaderIter {
816 fn new(mut root_reader: Reader, num_records: usize) -> Result<Self> {
817 root_reader.advance_columns()?;
819 Ok(Self {
820 root_reader,
821 records_left: num_records,
822 })
823 }
824}
825
826impl Iterator for ReaderIter {
827 type Item = Result<Row>;
828
829 fn next(&mut self) -> Option<Result<Row>> {
830 if self.records_left > 0 {
831 self.records_left -= 1;
832 Some(self.root_reader.read())
833 } else {
834 None
835 }
836 }
837}
838
839#[cfg(test)]
840mod tests {
841 use super::*;
842
843 use crate::data_type::Int64Type;
844 use crate::file::reader::SerializedFileReader;
845 use crate::file::writer::SerializedFileWriter;
846 use crate::record::api::RowAccessor;
847 use crate::schema::parser::parse_message_type;
848 use crate::util::test_common::file_util::{get_test_file, get_test_path};
849 use bytes::Bytes;
850
851 macro_rules! row {
854 ($($e:tt)*) => {
855 {
856 Row::new(vec![$($e)*])
857 }
858 }
859 }
860
861 macro_rules! list {
862 ($($e:tt)*) => {
863 {
864 Field::ListInternal(make_list(vec![$($e)*]))
865 }
866 }
867 }
868
869 macro_rules! map {
870 ($($e:tt)*) => {
871 {
872 Field::MapInternal(make_map(vec![$($e)*]))
873 }
874 }
875 }
876
877 macro_rules! group {
878 ( $( $e:expr ), * ) => {
879 {
880 Field::Group(row!($( $e ), *))
881 }
882 }
883 }
884
885 #[test]
886 fn test_file_reader_rows_nulls() {
887 let rows = test_file_reader_rows("nulls.snappy.parquet", None).unwrap();
888 let expected_rows = vec![
889 row![(
890 "b_struct".to_string(),
891 group![("b_c_int".to_string(), Field::Null)]
892 )],
893 row![(
894 "b_struct".to_string(),
895 group![("b_c_int".to_string(), Field::Null)]
896 )],
897 row![(
898 "b_struct".to_string(),
899 group![("b_c_int".to_string(), Field::Null)]
900 )],
901 row![(
902 "b_struct".to_string(),
903 group![("b_c_int".to_string(), Field::Null)]
904 )],
905 row![(
906 "b_struct".to_string(),
907 group![("b_c_int".to_string(), Field::Null)]
908 )],
909 row![(
910 "b_struct".to_string(),
911 group![("b_c_int".to_string(), Field::Null)]
912 )],
913 row![(
914 "b_struct".to_string(),
915 group![("b_c_int".to_string(), Field::Null)]
916 )],
917 row![(
918 "b_struct".to_string(),
919 group![("b_c_int".to_string(), Field::Null)]
920 )],
921 ];
922 assert_eq!(rows, expected_rows);
923 }
924
925 #[test]
926 fn test_file_reader_rows_nonnullable() {
927 let rows = test_file_reader_rows("nonnullable.impala.parquet", None).unwrap();
928 let expected_rows = vec![row![
929 ("ID".to_string(), Field::Long(8)),
930 ("Int_Array".to_string(), list![Field::Int(-1)]),
931 (
932 "int_array_array".to_string(),
933 list![list![Field::Int(-1), Field::Int(-2)], list![]]
934 ),
935 (
936 "Int_Map".to_string(),
937 map![(Field::Str("k1".to_string()), Field::Int(-1))]
938 ),
939 (
940 "int_map_array".to_string(),
941 list![
942 map![],
943 map![(Field::Str("k1".to_string()), Field::Int(1))],
944 map![],
945 map![]
946 ]
947 ),
948 (
949 "nested_Struct".to_string(),
950 group![
951 ("a".to_string(), Field::Int(-1)),
952 ("B".to_string(), list![Field::Int(-1)]),
953 (
954 "c".to_string(),
955 group![(
956 "D".to_string(),
957 list![list![group![
958 ("e".to_string(), Field::Int(-1)),
959 ("f".to_string(), Field::Str("nonnullable".to_string()))
960 ]]]
961 )]
962 ),
963 ("G".to_string(), map![])
964 ]
965 )
966 ]];
967 assert_eq!(rows, expected_rows);
968 }
969
970 #[test]
971 fn test_file_reader_rows_nullable() {
972 let rows = test_file_reader_rows("nullable.impala.parquet", None).unwrap();
973 let expected_rows = vec![
974 row![
975 ("id".to_string(), Field::Long(1)),
976 (
977 "int_array".to_string(),
978 list![Field::Int(1), Field::Int(2), Field::Int(3)]
979 ),
980 (
981 "int_array_Array".to_string(),
982 list![
983 list![Field::Int(1), Field::Int(2)],
984 list![Field::Int(3), Field::Int(4)]
985 ]
986 ),
987 (
988 "int_map".to_string(),
989 map![
990 (Field::Str("k1".to_string()), Field::Int(1)),
991 (Field::Str("k2".to_string()), Field::Int(100))
992 ]
993 ),
994 (
995 "int_Map_Array".to_string(),
996 list![map![(Field::Str("k1".to_string()), Field::Int(1))]]
997 ),
998 (
999 "nested_struct".to_string(),
1000 group![
1001 ("A".to_string(), Field::Int(1)),
1002 ("b".to_string(), list![Field::Int(1)]),
1003 (
1004 "C".to_string(),
1005 group![(
1006 "d".to_string(),
1007 list![
1008 list![
1009 group![
1010 ("E".to_string(), Field::Int(10)),
1011 ("F".to_string(), Field::Str("aaa".to_string()))
1012 ],
1013 group![
1014 ("E".to_string(), Field::Int(-10)),
1015 ("F".to_string(), Field::Str("bbb".to_string()))
1016 ]
1017 ],
1018 list![group![
1019 ("E".to_string(), Field::Int(11)),
1020 ("F".to_string(), Field::Str("c".to_string()))
1021 ]]
1022 ]
1023 )]
1024 ),
1025 (
1026 "g".to_string(),
1027 map![(
1028 Field::Str("foo".to_string()),
1029 group![(
1030 "H".to_string(),
1031 group![("i".to_string(), list![Field::Double(1.1)])]
1032 )]
1033 )]
1034 )
1035 ]
1036 )
1037 ],
1038 row![
1039 ("id".to_string(), Field::Long(2)),
1040 (
1041 "int_array".to_string(),
1042 list![
1043 Field::Null,
1044 Field::Int(1),
1045 Field::Int(2),
1046 Field::Null,
1047 Field::Int(3),
1048 Field::Null
1049 ]
1050 ),
1051 (
1052 "int_array_Array".to_string(),
1053 list![
1054 list![Field::Null, Field::Int(1), Field::Int(2), Field::Null],
1055 list![Field::Int(3), Field::Null, Field::Int(4)],
1056 list![],
1057 Field::Null
1058 ]
1059 ),
1060 (
1061 "int_map".to_string(),
1062 map![
1063 (Field::Str("k1".to_string()), Field::Int(2)),
1064 (Field::Str("k2".to_string()), Field::Null)
1065 ]
1066 ),
1067 (
1068 "int_Map_Array".to_string(),
1069 list![
1070 map![
1071 (Field::Str("k3".to_string()), Field::Null),
1072 (Field::Str("k1".to_string()), Field::Int(1))
1073 ],
1074 Field::Null,
1075 map![]
1076 ]
1077 ),
1078 (
1079 "nested_struct".to_string(),
1080 group![
1081 ("A".to_string(), Field::Null),
1082 ("b".to_string(), list![Field::Null]),
1083 (
1084 "C".to_string(),
1085 group![(
1086 "d".to_string(),
1087 list![
1088 list![
1089 group![
1090 ("E".to_string(), Field::Null),
1091 ("F".to_string(), Field::Null)
1092 ],
1093 group![
1094 ("E".to_string(), Field::Int(10)),
1095 ("F".to_string(), Field::Str("aaa".to_string()))
1096 ],
1097 group![
1098 ("E".to_string(), Field::Null),
1099 ("F".to_string(), Field::Null)
1100 ],
1101 group![
1102 ("E".to_string(), Field::Int(-10)),
1103 ("F".to_string(), Field::Str("bbb".to_string()))
1104 ],
1105 group![
1106 ("E".to_string(), Field::Null),
1107 ("F".to_string(), Field::Null)
1108 ]
1109 ],
1110 list![
1111 group![
1112 ("E".to_string(), Field::Int(11)),
1113 ("F".to_string(), Field::Str("c".to_string()))
1114 ],
1115 Field::Null
1116 ],
1117 list![],
1118 Field::Null
1119 ]
1120 )]
1121 ),
1122 (
1123 "g".to_string(),
1124 map![
1125 (
1126 Field::Str("g1".to_string()),
1127 group![(
1128 "H".to_string(),
1129 group![(
1130 "i".to_string(),
1131 list![Field::Double(2.2), Field::Null]
1132 )]
1133 )]
1134 ),
1135 (
1136 Field::Str("g2".to_string()),
1137 group![("H".to_string(), group![("i".to_string(), list![])])]
1138 ),
1139 (Field::Str("g3".to_string()), Field::Null),
1140 (
1141 Field::Str("g4".to_string()),
1142 group![(
1143 "H".to_string(),
1144 group![("i".to_string(), Field::Null)]
1145 )]
1146 ),
1147 (
1148 Field::Str("g5".to_string()),
1149 group![("H".to_string(), Field::Null)]
1150 )
1151 ]
1152 )
1153 ]
1154 )
1155 ],
1156 row![
1157 ("id".to_string(), Field::Long(3)),
1158 ("int_array".to_string(), list![]),
1159 ("int_array_Array".to_string(), list![Field::Null]),
1160 ("int_map".to_string(), map![]),
1161 ("int_Map_Array".to_string(), list![Field::Null, Field::Null]),
1162 (
1163 "nested_struct".to_string(),
1164 group![
1165 ("A".to_string(), Field::Null),
1166 ("b".to_string(), Field::Null),
1167 ("C".to_string(), group![("d".to_string(), list![])]),
1168 ("g".to_string(), map![])
1169 ]
1170 )
1171 ],
1172 row![
1173 ("id".to_string(), Field::Long(4)),
1174 ("int_array".to_string(), Field::Null),
1175 ("int_array_Array".to_string(), list![]),
1176 ("int_map".to_string(), map![]),
1177 ("int_Map_Array".to_string(), list![]),
1178 (
1179 "nested_struct".to_string(),
1180 group![
1181 ("A".to_string(), Field::Null),
1182 ("b".to_string(), Field::Null),
1183 ("C".to_string(), group![("d".to_string(), Field::Null)]),
1184 ("g".to_string(), Field::Null)
1185 ]
1186 )
1187 ],
1188 row![
1189 ("id".to_string(), Field::Long(5)),
1190 ("int_array".to_string(), Field::Null),
1191 ("int_array_Array".to_string(), Field::Null),
1192 ("int_map".to_string(), map![]),
1193 ("int_Map_Array".to_string(), Field::Null),
1194 (
1195 "nested_struct".to_string(),
1196 group![
1197 ("A".to_string(), Field::Null),
1198 ("b".to_string(), Field::Null),
1199 ("C".to_string(), Field::Null),
1200 (
1201 "g".to_string(),
1202 map![(
1203 Field::Str("foo".to_string()),
1204 group![(
1205 "H".to_string(),
1206 group![(
1207 "i".to_string(),
1208 list![Field::Double(2.2), Field::Double(3.3)]
1209 )]
1210 )]
1211 )]
1212 )
1213 ]
1214 )
1215 ],
1216 row![
1217 ("id".to_string(), Field::Long(6)),
1218 ("int_array".to_string(), Field::Null),
1219 ("int_array_Array".to_string(), Field::Null),
1220 ("int_map".to_string(), Field::Null),
1221 ("int_Map_Array".to_string(), Field::Null),
1222 ("nested_struct".to_string(), Field::Null)
1223 ],
1224 row![
1225 ("id".to_string(), Field::Long(7)),
1226 ("int_array".to_string(), Field::Null),
1227 (
1228 "int_array_Array".to_string(),
1229 list![Field::Null, list![Field::Int(5), Field::Int(6)]]
1230 ),
1231 (
1232 "int_map".to_string(),
1233 map![
1234 (Field::Str("k1".to_string()), Field::Null),
1235 (Field::Str("k3".to_string()), Field::Null)
1236 ]
1237 ),
1238 ("int_Map_Array".to_string(), Field::Null),
1239 (
1240 "nested_struct".to_string(),
1241 group![
1242 ("A".to_string(), Field::Int(7)),
1243 (
1244 "b".to_string(),
1245 list![Field::Int(2), Field::Int(3), Field::Null]
1246 ),
1247 (
1248 "C".to_string(),
1249 group![(
1250 "d".to_string(),
1251 list![list![], list![Field::Null], Field::Null]
1252 )]
1253 ),
1254 ("g".to_string(), Field::Null)
1255 ]
1256 )
1257 ],
1258 ];
1259 assert_eq!(rows, expected_rows);
1260 }
1261
1262 #[test]
1263 fn test_file_reader_rows_projection() {
1264 let schema = "
1265 message spark_schema {
1266 REQUIRED DOUBLE c;
1267 REQUIRED INT32 b;
1268 }
1269 ";
1270 let schema = parse_message_type(schema).unwrap();
1271 let rows = test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
1272 let expected_rows = vec![
1273 row![
1274 ("c".to_string(), Field::Double(1.0)),
1275 ("b".to_string(), Field::Int(1))
1276 ],
1277 row![
1278 ("c".to_string(), Field::Double(1.0)),
1279 ("b".to_string(), Field::Int(1))
1280 ],
1281 row![
1282 ("c".to_string(), Field::Double(1.0)),
1283 ("b".to_string(), Field::Int(1))
1284 ],
1285 row![
1286 ("c".to_string(), Field::Double(1.0)),
1287 ("b".to_string(), Field::Int(1))
1288 ],
1289 row![
1290 ("c".to_string(), Field::Double(1.0)),
1291 ("b".to_string(), Field::Int(1))
1292 ],
1293 row![
1294 ("c".to_string(), Field::Double(1.0)),
1295 ("b".to_string(), Field::Int(1))
1296 ],
1297 ];
1298 assert_eq!(rows, expected_rows);
1299 }
1300
1301 #[test]
1302 fn test_iter_columns_in_row() {
1303 let r = row![
1304 ("c".to_string(), Field::Double(1.0)),
1305 ("b".to_string(), Field::Int(1))
1306 ];
1307 let mut result = Vec::new();
1308 for (name, record) in r.get_column_iter() {
1309 result.push((name, record));
1310 }
1311 assert_eq!(
1312 vec![
1313 (&"c".to_string(), &Field::Double(1.0)),
1314 (&"b".to_string(), &Field::Int(1))
1315 ],
1316 result
1317 );
1318 }
1319
1320 #[test]
1321 fn test_into_columns_in_row() {
1322 let r = row![
1323 ("a".to_string(), Field::Str("My string".to_owned())),
1324 ("b".to_string(), Field::Int(1))
1325 ];
1326 assert_eq!(
1327 r.into_columns(),
1328 vec![
1329 ("a".to_string(), Field::Str("My string".to_owned())),
1330 ("b".to_string(), Field::Int(1)),
1331 ]
1332 );
1333 }
1334
1335 #[test]
1336 fn test_file_reader_rows_projection_map() {
1337 let schema = "
1338 message spark_schema {
1339 OPTIONAL group a (MAP) {
1340 REPEATED group key_value {
1341 REQUIRED BYTE_ARRAY key (UTF8);
1342 OPTIONAL group value (MAP) {
1343 REPEATED group key_value {
1344 REQUIRED INT32 key;
1345 REQUIRED BOOLEAN value;
1346 }
1347 }
1348 }
1349 }
1350 }
1351 ";
1352 let schema = parse_message_type(schema).unwrap();
1353 let rows = test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
1354 let expected_rows = vec![
1355 row![(
1356 "a".to_string(),
1357 map![(
1358 Field::Str("a".to_string()),
1359 map![
1360 (Field::Int(1), Field::Bool(true)),
1361 (Field::Int(2), Field::Bool(false))
1362 ]
1363 )]
1364 )],
1365 row![(
1366 "a".to_string(),
1367 map![(
1368 Field::Str("b".to_string()),
1369 map![(Field::Int(1), Field::Bool(true))]
1370 )]
1371 )],
1372 row![(
1373 "a".to_string(),
1374 map![(Field::Str("c".to_string()), Field::Null)]
1375 )],
1376 row![("a".to_string(), map![(Field::Str("d".to_string()), map![])])],
1377 row![(
1378 "a".to_string(),
1379 map![(
1380 Field::Str("e".to_string()),
1381 map![(Field::Int(1), Field::Bool(true))]
1382 )]
1383 )],
1384 row![(
1385 "a".to_string(),
1386 map![(
1387 Field::Str("f".to_string()),
1388 map![
1389 (Field::Int(3), Field::Bool(true)),
1390 (Field::Int(4), Field::Bool(false)),
1391 (Field::Int(5), Field::Bool(true))
1392 ]
1393 )]
1394 )],
1395 ];
1396 assert_eq!(rows, expected_rows);
1397 }
1398
1399 #[test]
1400 fn test_file_reader_rows_projection_list() {
1401 let schema = "
1402 message spark_schema {
1403 OPTIONAL group a (LIST) {
1404 REPEATED group list {
1405 OPTIONAL group element (LIST) {
1406 REPEATED group list {
1407 OPTIONAL group element (LIST) {
1408 REPEATED group list {
1409 OPTIONAL BYTE_ARRAY element (UTF8);
1410 }
1411 }
1412 }
1413 }
1414 }
1415 }
1416 }
1417 ";
1418 let schema = parse_message_type(schema).unwrap();
1419 let rows = test_file_reader_rows("nested_lists.snappy.parquet", Some(schema)).unwrap();
1420 let expected_rows = vec![
1421 row![(
1422 "a".to_string(),
1423 list![
1424 list![
1425 list![Field::Str("a".to_string()), Field::Str("b".to_string())],
1426 list![Field::Str("c".to_string())]
1427 ],
1428 list![Field::Null, list![Field::Str("d".to_string())]]
1429 ]
1430 )],
1431 row![(
1432 "a".to_string(),
1433 list![
1434 list![
1435 list![Field::Str("a".to_string()), Field::Str("b".to_string())],
1436 list![Field::Str("c".to_string()), Field::Str("d".to_string())]
1437 ],
1438 list![Field::Null, list![Field::Str("e".to_string())]]
1439 ]
1440 )],
1441 row![(
1442 "a".to_string(),
1443 list![
1444 list![
1445 list![Field::Str("a".to_string()), Field::Str("b".to_string())],
1446 list![Field::Str("c".to_string()), Field::Str("d".to_string())],
1447 list![Field::Str("e".to_string())]
1448 ],
1449 list![Field::Null, list![Field::Str("f".to_string())]]
1450 ]
1451 )],
1452 ];
1453 assert_eq!(rows, expected_rows);
1454 }
1455
1456 #[test]
1457 fn test_file_reader_rows_invalid_projection() {
1458 let schema = "
1459 message spark_schema {
1460 REQUIRED INT32 key;
1461 REQUIRED BOOLEAN value;
1462 }
1463 ";
1464 let schema = parse_message_type(schema).unwrap();
1465 let res = test_file_reader_rows("nested_maps.snappy.parquet", Some(schema));
1466 assert_eq!(
1467 res.unwrap_err().to_string(),
1468 "Parquet error: Root schema does not contain projection"
1469 );
1470 }
1471
1472 #[test]
1473 fn test_row_group_rows_invalid_projection() {
1474 let schema = "
1475 message spark_schema {
1476 REQUIRED INT32 key;
1477 REQUIRED BOOLEAN value;
1478 }
1479 ";
1480 let schema = parse_message_type(schema).unwrap();
1481 let res = test_row_group_rows("nested_maps.snappy.parquet", Some(schema));
1482 assert_eq!(
1483 res.unwrap_err().to_string(),
1484 "Parquet error: Root schema does not contain projection"
1485 );
1486 }
1487
1488 #[test]
1489 fn test_file_reader_rows_nested_map_type() {
1490 let schema = "
1491 message spark_schema {
1492 OPTIONAL group a (MAP) {
1493 REPEATED group key_value {
1494 REQUIRED BYTE_ARRAY key (UTF8);
1495 OPTIONAL group value (MAP) {
1496 REPEATED group key_value {
1497 REQUIRED INT32 key;
1498 }
1499 }
1500 }
1501 }
1502 }
1503 ";
1504 let schema = parse_message_type(schema).unwrap();
1505 test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
1506 }
1507
1508 #[test]
1509 fn test_file_reader_iter() {
1510 let path = get_test_path("alltypes_plain.parquet");
1511 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1512 let iter = RowIter::from_file_into(Box::new(reader));
1513
1514 let values: Vec<_> = iter.flat_map(|r| r.unwrap().get_int(0)).collect();
1515 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1516 }
1517
1518 #[test]
1519 fn test_file_reader_iter_projection() {
1520 let path = get_test_path("alltypes_plain.parquet");
1521 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1522 let schema = "message schema { OPTIONAL INT32 id; }";
1523 let proj = parse_message_type(schema).ok();
1524
1525 let iter = RowIter::from_file_into(Box::new(reader))
1526 .project(proj)
1527 .unwrap();
1528 let values: Vec<_> = iter.flat_map(|r| r.unwrap().get_int(0)).collect();
1529
1530 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1531 }
1532
1533 #[test]
1534 fn test_file_reader_iter_projection_err() {
1535 let schema = "
1536 message spark_schema {
1537 REQUIRED INT32 key;
1538 REQUIRED BOOLEAN value;
1539 }
1540 ";
1541 let proj = parse_message_type(schema).ok();
1542 let path = get_test_path("nested_maps.snappy.parquet");
1543 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1544 let res = RowIter::from_file_into(Box::new(reader)).project(proj);
1545
1546 assert_eq!(
1547 res.err().unwrap().to_string(),
1548 "Parquet error: Root schema does not contain projection"
1549 );
1550 }
1551
1552 #[test]
1553 fn test_tree_reader_handle_repeated_fields_with_no_annotation() {
1554 let rows = test_file_reader_rows("repeated_no_annotation.parquet", None).unwrap();
1557 let expected_rows = vec![
1558 row![
1559 ("id".to_string(), Field::Int(1)),
1560 ("phoneNumbers".to_string(), Field::Null)
1561 ],
1562 row![
1563 ("id".to_string(), Field::Int(2)),
1564 ("phoneNumbers".to_string(), Field::Null)
1565 ],
1566 row![
1567 ("id".to_string(), Field::Int(3)),
1568 (
1569 "phoneNumbers".to_string(),
1570 group![("phone".to_string(), list![])]
1571 )
1572 ],
1573 row![
1574 ("id".to_string(), Field::Int(4)),
1575 (
1576 "phoneNumbers".to_string(),
1577 group![(
1578 "phone".to_string(),
1579 list![group![
1580 ("number".to_string(), Field::Long(5555555555)),
1581 ("kind".to_string(), Field::Null)
1582 ]]
1583 )]
1584 )
1585 ],
1586 row![
1587 ("id".to_string(), Field::Int(5)),
1588 (
1589 "phoneNumbers".to_string(),
1590 group![(
1591 "phone".to_string(),
1592 list![group![
1593 ("number".to_string(), Field::Long(1111111111)),
1594 ("kind".to_string(), Field::Str("home".to_string()))
1595 ]]
1596 )]
1597 )
1598 ],
1599 row![
1600 ("id".to_string(), Field::Int(6)),
1601 (
1602 "phoneNumbers".to_string(),
1603 group![(
1604 "phone".to_string(),
1605 list![
1606 group![
1607 ("number".to_string(), Field::Long(1111111111)),
1608 ("kind".to_string(), Field::Str("home".to_string()))
1609 ],
1610 group![
1611 ("number".to_string(), Field::Long(2222222222)),
1612 ("kind".to_string(), Field::Null)
1613 ],
1614 group![
1615 ("number".to_string(), Field::Long(3333333333)),
1616 ("kind".to_string(), Field::Str("mobile".to_string()))
1617 ]
1618 ]
1619 )]
1620 )
1621 ],
1622 ];
1623
1624 assert_eq!(rows, expected_rows);
1625 }
1626
1627 #[test]
1628 fn test_tree_reader_handle_nested_repeated_fields_with_no_annotation() {
1629 let schema = Arc::new(
1631 parse_message_type(
1632 "
1633 message schema {
1634 REPEATED group level1 {
1635 REPEATED group level2 {
1636 REQUIRED group level3 {
1637 REQUIRED INT64 value3;
1638 }
1639 }
1640 REQUIRED INT64 value1;
1641 }
1642 }",
1643 )
1644 .unwrap(),
1645 );
1646
1647 let mut buffer: Vec<u8> = Vec::new();
1649 let mut file_writer =
1650 SerializedFileWriter::new(&mut buffer, schema, Default::default()).unwrap();
1651 let mut row_group_writer = file_writer.next_row_group().unwrap();
1652
1653 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
1655 column_writer
1656 .typed::<Int64Type>()
1657 .write_batch(&[30, 31, 32], Some(&[2, 2, 2]), Some(&[0, 0, 0]))
1658 .unwrap();
1659 column_writer.close().unwrap();
1660
1661 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
1663 column_writer
1664 .typed::<Int64Type>()
1665 .write_batch(&[10, 11, 12], Some(&[1, 1, 1]), Some(&[0, 0, 0]))
1666 .unwrap();
1667 column_writer.close().unwrap();
1668
1669 row_group_writer.close().unwrap();
1671 file_writer.close().unwrap();
1672 assert_eq!(&buffer[0..4], b"PAR1");
1673
1674 let file_reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
1676 let rows: Vec<_> = file_reader
1677 .get_row_iter(None)
1678 .unwrap()
1679 .map(|row| row.unwrap())
1680 .collect();
1681
1682 let expected_rows = vec![
1683 row![(
1684 "level1".to_string(),
1685 list![group![
1686 (
1687 "level2".to_string(),
1688 list![group![(
1689 "level3".to_string(),
1690 group![("value3".to_string(), Field::Long(30))]
1691 )]]
1692 ),
1693 ("value1".to_string(), Field::Long(10))
1694 ]]
1695 )],
1696 row![(
1697 "level1".to_string(),
1698 list![group![
1699 (
1700 "level2".to_string(),
1701 list![group![(
1702 "level3".to_string(),
1703 group![("value3".to_string(), Field::Long(31))]
1704 )]]
1705 ),
1706 ("value1".to_string(), Field::Long(11))
1707 ]]
1708 )],
1709 row![(
1710 "level1".to_string(),
1711 list![group![
1712 (
1713 "level2".to_string(),
1714 list![group![(
1715 "level3".to_string(),
1716 group![("value3".to_string(), Field::Long(32))]
1717 )]]
1718 ),
1719 ("value1".to_string(), Field::Long(12))
1720 ]]
1721 )],
1722 ];
1723
1724 assert_eq!(rows, expected_rows);
1725 }
1726
1727 #[test]
1728 fn test_tree_reader_handle_primitive_repeated_fields_with_no_annotation() {
1729 let rows = test_file_reader_rows("repeated_primitive_no_list.parquet", None).unwrap();
1731 let expected_rows = vec![
1732 row![
1733 (
1734 "Int32_list".to_string(),
1735 Field::ListInternal(make_list([0, 1, 2, 3].map(Field::Int).to_vec()))
1736 ),
1737 (
1738 "String_list".to_string(),
1739 Field::ListInternal(make_list(
1740 ["foo", "zero", "one", "two"]
1741 .map(|s| Field::Str(s.to_string()))
1742 .to_vec()
1743 ))
1744 ),
1745 (
1746 "group_of_lists".to_string(),
1747 group![
1748 (
1749 "Int32_list_in_group".to_string(),
1750 Field::ListInternal(make_list([0, 1, 2, 3].map(Field::Int).to_vec()))
1751 ),
1752 (
1753 "String_list_in_group".to_string(),
1754 Field::ListInternal(make_list(
1755 ["foo", "zero", "one", "two"]
1756 .map(|s| Field::Str(s.to_string()))
1757 .to_vec()
1758 ))
1759 )
1760 ]
1761 )
1762 ],
1763 row![
1764 (
1765 "Int32_list".to_string(),
1766 Field::ListInternal(make_list(vec![]))
1767 ),
1768 (
1769 "String_list".to_string(),
1770 Field::ListInternal(make_list(
1771 ["three"].map(|s| Field::Str(s.to_string())).to_vec()
1772 ))
1773 ),
1774 (
1775 "group_of_lists".to_string(),
1776 group![
1777 (
1778 "Int32_list_in_group".to_string(),
1779 Field::ListInternal(make_list(vec![]))
1780 ),
1781 (
1782 "String_list_in_group".to_string(),
1783 Field::ListInternal(make_list(
1784 ["three"].map(|s| Field::Str(s.to_string())).to_vec()
1785 ))
1786 )
1787 ]
1788 )
1789 ],
1790 row![
1791 (
1792 "Int32_list".to_string(),
1793 Field::ListInternal(make_list(vec![Field::Int(4)]))
1794 ),
1795 (
1796 "String_list".to_string(),
1797 Field::ListInternal(make_list(
1798 ["four"].map(|s| Field::Str(s.to_string())).to_vec()
1799 ))
1800 ),
1801 (
1802 "group_of_lists".to_string(),
1803 group![
1804 (
1805 "Int32_list_in_group".to_string(),
1806 Field::ListInternal(make_list(vec![Field::Int(4)]))
1807 ),
1808 (
1809 "String_list_in_group".to_string(),
1810 Field::ListInternal(make_list(
1811 ["four"].map(|s| Field::Str(s.to_string())).to_vec()
1812 ))
1813 )
1814 ]
1815 )
1816 ],
1817 row![
1818 (
1819 "Int32_list".to_string(),
1820 Field::ListInternal(make_list([5, 6, 7, 8].map(Field::Int).to_vec()))
1821 ),
1822 (
1823 "String_list".to_string(),
1824 Field::ListInternal(make_list(
1825 ["five", "six", "seven", "eight"]
1826 .map(|s| Field::Str(s.to_string()))
1827 .to_vec()
1828 ))
1829 ),
1830 (
1831 "group_of_lists".to_string(),
1832 group![
1833 (
1834 "Int32_list_in_group".to_string(),
1835 Field::ListInternal(make_list([5, 6, 7, 8].map(Field::Int).to_vec()))
1836 ),
1837 (
1838 "String_list_in_group".to_string(),
1839 Field::ListInternal(make_list(
1840 ["five", "six", "seven", "eight"]
1841 .map(|s| Field::Str(s.to_string()))
1842 .to_vec()
1843 ))
1844 )
1845 ]
1846 )
1847 ],
1848 ];
1849 assert_eq!(rows, expected_rows);
1850 }
1851
1852 #[test]
1853 fn test_map_no_value() {
1854 let rows = test_file_reader_rows("map_no_value.parquet", None).unwrap();
1874
1875 for row in rows {
1877 let cols = row.into_columns();
1878 assert_eq!(cols[1].1, cols[2].1);
1879 }
1880 }
1881
1882 fn test_file_reader_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> {
1883 let file = get_test_file(file_name);
1884 let file_reader: Box<dyn FileReader> = Box::new(SerializedFileReader::new(file)?);
1885 let iter = file_reader.get_row_iter(schema)?;
1886 Ok(iter.map(|row| row.unwrap()).collect())
1887 }
1888
1889 fn test_row_group_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> {
1890 let file = get_test_file(file_name);
1891 let file_reader: Box<dyn FileReader> = Box::new(SerializedFileReader::new(file)?);
1892 let row_group_reader = file_reader.get_row_group(0).unwrap();
1895 let iter = row_group_reader.get_row_iter(schema)?;
1896 Ok(iter.map(|row| row.unwrap()).collect())
1897 }
1898
1899 #[test]
1900 fn test_read_old_nested_list() {
1901 let rows = test_file_reader_rows("old_list_structure.parquet", None).unwrap();
1902 let expected_rows = vec![row![(
1903 "a".to_string(),
1904 Field::ListInternal(make_list(
1905 [
1906 make_list([1, 2].map(Field::Int).to_vec()),
1907 make_list([3, 4].map(Field::Int).to_vec())
1908 ]
1909 .map(Field::ListInternal)
1910 .to_vec()
1911 ))
1912 ),]];
1913 assert_eq!(rows, expected_rows);
1914 }
1915}