1use std::{collections::HashMap, fmt, sync::Arc};
22
23use crate::basic::{ConvertedType, Repetition};
24use crate::errors::{ParquetError, Result};
25use crate::file::reader::{FileReader, RowGroupReader};
26use crate::record::{
27 api::{Field, Row, make_list, make_map},
28 triplet::TripletIter,
29};
30use crate::schema::types::{ColumnPath, SchemaDescPtr, SchemaDescriptor, Type, TypePtr};
31
32const DEFAULT_BATCH_SIZE: usize = 1024;
34
35pub struct TreeBuilder {
39 batch_size: usize,
41}
42
43impl Default for TreeBuilder {
44 fn default() -> Self {
45 Self::new()
46 }
47}
48
49impl TreeBuilder {
50 pub fn new() -> Self {
52 Self {
53 batch_size: DEFAULT_BATCH_SIZE,
54 }
55 }
56
57 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
59 self.batch_size = batch_size;
60 self
61 }
62
63 pub fn build(
65 &self,
66 descr: SchemaDescPtr,
67 row_group_reader: &dyn RowGroupReader,
68 ) -> Result<Reader> {
69 let mut paths: HashMap<ColumnPath, usize> = HashMap::new();
72 let row_group_metadata = row_group_reader.metadata();
73
74 for col_index in 0..row_group_reader.num_columns() {
75 let col_meta = row_group_metadata.column(col_index);
76 let col_path = col_meta.column_path().clone();
77 paths.insert(col_path, col_index);
78 }
79
80 let mut readers = Vec::new();
82 let mut path = Vec::new();
83
84 for field in descr.root_schema().get_fields() {
85 let reader =
86 self.reader_tree(field.clone(), &mut path, 0, 0, &paths, row_group_reader)?;
87 readers.push(reader);
88 }
89
90 Ok(Reader::GroupReader(None, 0, readers))
93 }
94
95 pub fn as_iter(
97 &self,
98 descr: SchemaDescPtr,
99 row_group_reader: &dyn RowGroupReader,
100 ) -> Result<ReaderIter> {
101 let num_records = row_group_reader.metadata().num_rows() as usize;
102 ReaderIter::new(self.build(descr, row_group_reader)?, num_records)
103 }
104
105 fn reader_tree(
107 &self,
108 field: TypePtr,
109 path: &mut Vec<String>,
110 mut curr_def_level: i16,
111 mut curr_rep_level: i16,
112 paths: &HashMap<ColumnPath, usize>,
113 row_group_reader: &dyn RowGroupReader,
114 ) -> Result<Reader> {
115 assert!(field.get_basic_info().has_repetition());
116 let repetition = field.get_basic_info().repetition();
118 match repetition {
119 Repetition::OPTIONAL => {
120 curr_def_level += 1;
121 }
122 Repetition::REPEATED => {
123 curr_def_level += 1;
124 curr_rep_level += 1;
125 }
126 _ => {}
127 }
128
129 path.push(String::from(field.name()));
130 let reader = if field.is_primitive() {
131 let col_path = ColumnPath::new(path.to_vec());
132 let orig_index = *paths
133 .get(&col_path)
134 .ok_or(general_err!("Path {:?} not found", col_path))?;
135 let col_descr = row_group_reader
136 .metadata()
137 .column(orig_index)
138 .column_descr_ptr();
139 let col_reader = row_group_reader.get_column_reader(orig_index)?;
140 let column = TripletIter::new(col_descr, col_reader, self.batch_size);
141 let reader = Reader::PrimitiveReader(field.clone(), Box::new(column));
142 if repetition == Repetition::REPEATED {
143 Reader::RepeatedReader(
144 field,
145 curr_def_level - 1,
146 curr_rep_level - 1,
147 Box::new(reader),
148 )
149 } else {
150 reader
151 }
152 } else {
153 match field.get_basic_info().converted_type() {
154 ConvertedType::LIST => {
156 assert_eq!(field.get_fields().len(), 1, "Invalid list type {field:?}");
157
158 let repeated_field = field.get_fields()[0].clone();
159 assert_eq!(
160 repeated_field.get_basic_info().repetition(),
161 Repetition::REPEATED,
162 "Invalid list type {field:?}"
163 );
164
165 if Reader::is_element_type(&repeated_field) {
166 let reader = self.reader_tree(
168 repeated_field,
169 path,
170 curr_def_level,
171 curr_rep_level,
172 paths,
173 row_group_reader,
174 )?;
175
176 Reader::RepeatedReader(
177 field,
178 curr_def_level,
179 curr_rep_level,
180 Box::new(reader),
181 )
182 } else {
183 let child_field = repeated_field.get_fields()[0].clone();
184
185 path.push(String::from(repeated_field.name()));
186
187 let reader = self.reader_tree(
188 child_field,
189 path,
190 curr_def_level + 1,
191 curr_rep_level + 1,
192 paths,
193 row_group_reader,
194 )?;
195
196 path.pop();
197
198 Reader::RepeatedReader(
199 field,
200 curr_def_level,
201 curr_rep_level,
202 Box::new(reader),
203 )
204 }
205 }
206 ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
208 assert_eq!(field.get_fields().len(), 1, "Invalid map type: {field:?}");
209 assert!(
210 !field.get_fields()[0].is_primitive(),
211 "Invalid map type: {field:?}"
212 );
213
214 let key_value_type = field.get_fields()[0].clone();
215 assert_eq!(
216 key_value_type.get_basic_info().repetition(),
217 Repetition::REPEATED,
218 "Invalid map type: {field:?}"
219 );
220 if key_value_type.get_fields().len() != 1 {
222 assert_eq!(
224 key_value_type.get_fields().len(),
225 2,
226 "Invalid map type: {field:?}"
227 );
228 }
229
230 path.push(String::from(key_value_type.name()));
231
232 let key_type = &key_value_type.get_fields()[0];
233 assert!(
234 key_type.is_primitive(),
235 "Map key type is expected to be a primitive type, but found {key_type:?}"
236 );
237 let key_reader = self.reader_tree(
238 key_type.clone(),
239 path,
240 curr_def_level + 1,
241 curr_rep_level + 1,
242 paths,
243 row_group_reader,
244 )?;
245
246 if key_value_type.get_fields().len() == 1 {
247 path.pop();
248 Reader::RepeatedReader(
249 field,
250 curr_def_level,
251 curr_rep_level,
252 Box::new(key_reader),
253 )
254 } else {
255 let value_type = &key_value_type.get_fields()[1];
256 let value_reader = self.reader_tree(
257 value_type.clone(),
258 path,
259 curr_def_level + 1,
260 curr_rep_level + 1,
261 paths,
262 row_group_reader,
263 )?;
264
265 path.pop();
266
267 Reader::KeyValueReader(
268 field,
269 curr_def_level,
270 curr_rep_level,
271 Box::new(key_reader),
272 Box::new(value_reader),
273 )
274 }
275 }
276 _ if repetition == Repetition::REPEATED => {
281 let required_field = Type::group_type_builder(field.name())
282 .with_repetition(Repetition::REQUIRED)
283 .with_converted_type(field.get_basic_info().converted_type())
284 .with_fields(field.get_fields().to_vec())
285 .build()?;
286
287 path.pop();
288
289 let reader = self.reader_tree(
290 Arc::new(required_field),
291 path,
292 curr_def_level,
293 curr_rep_level,
294 paths,
295 row_group_reader,
296 )?;
297
298 return Ok(Reader::RepeatedReader(
299 field,
300 curr_def_level - 1,
301 curr_rep_level - 1,
302 Box::new(reader),
303 ));
304 }
305 _ => {
307 let mut readers = Vec::new();
308 for child in field.get_fields() {
309 let reader = self.reader_tree(
310 child.clone(),
311 path,
312 curr_def_level,
313 curr_rep_level,
314 paths,
315 row_group_reader,
316 )?;
317 readers.push(reader);
318 }
319 Reader::GroupReader(Some(field), curr_def_level, readers)
320 }
321 }
322 };
323 path.pop();
324
325 Ok(Reader::option(repetition, curr_def_level, reader))
326 }
327}
328
329pub enum Reader {
331 PrimitiveReader(TypePtr, Box<TripletIter>),
333 OptionReader(i16, Box<Reader>),
335 GroupReader(Option<TypePtr>, i16, Vec<Reader>),
338 RepeatedReader(TypePtr, i16, i16, Box<Reader>),
341 KeyValueReader(TypePtr, i16, i16, Box<Reader>, Box<Reader>),
344}
345
346impl Reader {
347 fn option(repetition: Repetition, def_level: i16, reader: Reader) -> Self {
349 if repetition == Repetition::OPTIONAL {
350 Reader::OptionReader(def_level - 1, Box::new(reader))
351 } else {
352 reader
353 }
354 }
355
356 fn is_element_type(repeated_type: &Type) -> bool {
362 if repeated_type.is_list() || repeated_type.has_single_repeated_child() {
372 return false;
373 }
374
375 repeated_type.is_primitive() ||
383 repeated_type.is_group() && repeated_type.get_fields().len() > 1 ||
395 repeated_type.name() == "array" ||
406 repeated_type.name().ends_with("_tuple")
416 }
417
418 fn read(&mut self) -> Result<Row> {
423 match *self {
424 Reader::GroupReader(_, _, ref mut readers) => {
425 let mut fields = Vec::new();
426 for reader in readers {
427 fields.push((String::from(reader.field_name()), reader.read_field()?));
428 }
429 Ok(Row::new(fields))
430 }
431 _ => panic!("Cannot call read() on {self}"),
432 }
433 }
434
435 fn read_field(&mut self) -> Result<Field> {
438 let field = match *self {
439 Reader::PrimitiveReader(_, ref mut column) => {
440 if !column.has_next() {
441 return Err(general_err!("Unexpected end of column data"));
442 }
443 let value = column.current_value()?;
444 column.read_next()?;
445 value
446 }
447 Reader::OptionReader(def_level, ref mut reader) => {
448 if !reader.has_next() {
449 return Err(general_err!("Unexpected end of column data"));
450 }
451 if reader.current_def_level() > def_level {
452 reader.read_field()?
453 } else {
454 reader.advance_columns()?;
455 Field::Null
456 }
457 }
458 Reader::GroupReader(_, def_level, ref mut readers) => {
459 let mut fields = Vec::new();
460 for reader in readers {
461 if reader.repetition() != Repetition::OPTIONAL
462 || reader.current_def_level() > def_level
463 {
464 fields.push((String::from(reader.field_name()), reader.read_field()?));
465 } else {
466 reader.advance_columns()?;
467 fields.push((String::from(reader.field_name()), Field::Null));
468 }
469 }
470 let row = Row::new(fields);
471 Field::Group(row)
472 }
473 Reader::RepeatedReader(_, def_level, rep_level, ref mut reader) => {
474 if !reader.has_next() {
475 return Err(general_err!("Unexpected end of column data"));
476 }
477 let mut elements = Vec::new();
478 loop {
479 if reader.current_def_level() > def_level {
480 elements.push(reader.read_field()?);
481 } else {
482 reader.advance_columns()?;
483 break;
488 }
489
490 if !reader.has_next() || reader.current_rep_level() <= rep_level {
494 break;
495 }
496 }
497 Field::ListInternal(make_list(elements))
498 }
499 Reader::KeyValueReader(_, def_level, rep_level, ref mut keys, ref mut values) => {
500 if !keys.has_next() {
501 return Err(general_err!("Unexpected end of column data"));
502 }
503 let mut pairs = Vec::new();
504 loop {
505 if keys.current_def_level() > def_level {
506 pairs.push((keys.read_field()?, values.read_field()?));
507 } else {
508 keys.advance_columns()?;
509 values.advance_columns()?;
510 break;
515 }
516
517 if !keys.has_next() || keys.current_rep_level() <= rep_level {
521 break;
522 }
523 }
524
525 Field::MapInternal(make_map(pairs))
526 }
527 };
528 Ok(field)
529 }
530
531 fn field_name(&self) -> &str {
533 match *self {
534 Reader::PrimitiveReader(ref field, _) => field.name(),
535 Reader::OptionReader(_, ref reader) => reader.field_name(),
536 Reader::GroupReader(ref opt, ..) => match opt {
537 Some(field) => field.name(),
538 None => panic!("Field is None for group reader"),
539 },
540 Reader::RepeatedReader(ref field, ..) => field.name(),
541 Reader::KeyValueReader(ref field, ..) => field.name(),
542 }
543 }
544
545 fn repetition(&self) -> Repetition {
547 match *self {
548 Reader::PrimitiveReader(ref field, _) => field.get_basic_info().repetition(),
549 Reader::OptionReader(_, ref reader) => reader.repetition(),
550 Reader::GroupReader(ref opt, ..) => match opt {
551 Some(field) => field.get_basic_info().repetition(),
552 None => panic!("Field is None for group reader"),
553 },
554 Reader::RepeatedReader(ref field, ..) => field.get_basic_info().repetition(),
555 Reader::KeyValueReader(ref field, ..) => field.get_basic_info().repetition(),
556 }
557 }
558
559 fn has_next(&self) -> bool {
562 match *self {
563 Reader::PrimitiveReader(_, ref column) => column.has_next(),
564 Reader::OptionReader(_, ref reader) => reader.has_next(),
565 Reader::GroupReader(_, _, ref readers) => readers.first().unwrap().has_next(),
566 Reader::RepeatedReader(_, _, _, ref reader) => reader.has_next(),
567 Reader::KeyValueReader(_, _, _, ref keys, _) => keys.has_next(),
568 }
569 }
570
571 fn current_def_level(&self) -> i16 {
574 match *self {
575 Reader::PrimitiveReader(_, ref column) => column.current_def_level(),
576 Reader::OptionReader(_, ref reader) => reader.current_def_level(),
577 Reader::GroupReader(_, _, ref readers) => match readers.first() {
578 Some(reader) => reader.current_def_level(),
579 None => panic!("Current definition level: empty group reader"),
580 },
581 Reader::RepeatedReader(_, _, _, ref reader) => reader.current_def_level(),
582 Reader::KeyValueReader(_, _, _, ref keys, _) => keys.current_def_level(),
583 }
584 }
585
586 fn current_rep_level(&self) -> i16 {
589 match *self {
590 Reader::PrimitiveReader(_, ref column) => column.current_rep_level(),
591 Reader::OptionReader(_, ref reader) => reader.current_rep_level(),
592 Reader::GroupReader(_, _, ref readers) => match readers.first() {
593 Some(reader) => reader.current_rep_level(),
594 None => panic!("Current repetition level: empty group reader"),
595 },
596 Reader::RepeatedReader(_, _, _, ref reader) => reader.current_rep_level(),
597 Reader::KeyValueReader(_, _, _, ref keys, _) => keys.current_rep_level(),
598 }
599 }
600
601 fn advance_columns(&mut self) -> Result<()> {
603 match *self {
604 Reader::PrimitiveReader(_, ref mut column) => column.read_next().map(|_| ()),
605 Reader::OptionReader(_, ref mut reader) => reader.advance_columns(),
606 Reader::GroupReader(_, _, ref mut readers) => {
607 for reader in readers {
608 reader.advance_columns()?;
609 }
610 Ok(())
611 }
612 Reader::RepeatedReader(_, _, _, ref mut reader) => reader.advance_columns(),
613 Reader::KeyValueReader(_, _, _, ref mut keys, ref mut values) => {
614 keys.advance_columns()?;
615 values.advance_columns()
616 }
617 }
618 }
619}
620
621impl fmt::Display for Reader {
622 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
623 let s = match self {
624 Reader::PrimitiveReader(..) => "PrimitiveReader",
625 Reader::OptionReader(..) => "OptionReader",
626 Reader::GroupReader(..) => "GroupReader",
627 Reader::RepeatedReader(..) => "RepeatedReader",
628 Reader::KeyValueReader(..) => "KeyValueReader",
629 };
630 write!(f, "{s}")
631 }
632}
633
634enum Either<'a> {
640 Left(&'a dyn FileReader),
641 Right(Box<dyn FileReader>),
642}
643
644impl Either<'_> {
645 fn reader(&self) -> &dyn FileReader {
646 match *self {
647 Either::Left(r) => r,
648 Either::Right(ref r) => &**r,
649 }
650 }
651}
652
653pub struct RowIter<'a> {
668 descr: SchemaDescPtr,
669 tree_builder: TreeBuilder,
670 file_reader: Option<Either<'a>>,
671 current_row_group: usize,
672 num_row_groups: usize,
673 row_iter: Option<ReaderIter>,
674}
675
676impl<'a> RowIter<'a> {
677 fn new(
679 file_reader: Option<Either<'a>>,
680 row_iter: Option<ReaderIter>,
681 descr: SchemaDescPtr,
682 ) -> Self {
683 let tree_builder = Self::tree_builder();
684 let num_row_groups = match file_reader {
685 Some(ref r) => r.reader().num_row_groups(),
686 None => 0,
687 };
688
689 Self {
690 descr,
691 file_reader,
692 tree_builder,
693 num_row_groups,
694 row_iter,
695 current_row_group: 0,
696 }
697 }
698
699 pub fn from_file(proj: Option<Type>, reader: &'a dyn FileReader) -> Result<Self> {
702 let either = Either::Left(reader);
703 let descr =
704 Self::get_proj_descr(proj, reader.metadata().file_metadata().schema_descr_ptr())?;
705
706 Ok(Self::new(Some(either), None, descr))
707 }
708
709 pub fn from_row_group(proj: Option<Type>, reader: &'a dyn RowGroupReader) -> Result<Self> {
711 let descr = Self::get_proj_descr(proj, reader.metadata().schema_descr_ptr())?;
712 let tree_builder = Self::tree_builder();
713 let row_iter = tree_builder.as_iter(descr.clone(), reader)?;
714
715 Ok(Self::new(None, Some(row_iter), descr))
718 }
719
720 pub fn from_file_into(reader: Box<dyn FileReader>) -> Self {
722 let either = Either::Right(reader);
723 let descr = either
724 .reader()
725 .metadata()
726 .file_metadata()
727 .schema_descr_ptr();
728
729 Self::new(Some(either), None, descr)
730 }
731
732 pub fn project(self, proj: Option<Type>) -> Result<Self> {
738 match self.file_reader {
739 Some(ref either) => {
740 let schema = either
741 .reader()
742 .metadata()
743 .file_metadata()
744 .schema_descr_ptr();
745 let descr = Self::get_proj_descr(proj, schema)?;
746
747 Ok(Self::new(self.file_reader, None, descr))
748 }
749 None => Err(general_err!("File reader is required to use projections")),
750 }
751 }
752
753 #[inline]
756 fn get_proj_descr(proj: Option<Type>, root_descr: SchemaDescPtr) -> Result<SchemaDescPtr> {
757 match proj {
758 Some(projection) => {
759 let root_schema = root_descr.root_schema();
761 if !root_schema.check_contains(&projection) {
762 return Err(general_err!("Root schema does not contain projection"));
763 }
764 Ok(Arc::new(SchemaDescriptor::new(Arc::new(projection))))
765 }
766 None => Ok(root_descr),
767 }
768 }
769
770 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
772 self.tree_builder = self.tree_builder.with_batch_size(batch_size);
773 self
774 }
775
776 #[inline]
779 fn tree_builder() -> TreeBuilder {
780 TreeBuilder::new()
781 }
782}
783
784impl Iterator for RowIter<'_> {
785 type Item = Result<Row>;
786
787 fn next(&mut self) -> Option<Result<Row>> {
788 let mut row = None;
789 if let Some(ref mut iter) = self.row_iter {
790 row = iter.next();
791 }
792
793 while row.is_none() && self.current_row_group < self.num_row_groups {
794 if let Some(ref either) = self.file_reader {
797 let file_reader = either.reader();
798 let row_group_reader = &*file_reader
799 .get_row_group(self.current_row_group)
800 .expect("Row group is required to advance");
801
802 match self
803 .tree_builder
804 .as_iter(self.descr.clone(), row_group_reader)
805 {
806 Ok(mut iter) => {
807 row = iter.next();
808
809 self.current_row_group += 1;
810 self.row_iter = Some(iter);
811 }
812 Err(e) => return Some(Err(e)),
813 }
814 }
815 }
816
817 row
818 }
819}
820
821pub struct ReaderIter {
823 root_reader: Reader,
824 records_left: usize,
825}
826
827impl ReaderIter {
828 fn new(mut root_reader: Reader, num_records: usize) -> Result<Self> {
829 root_reader.advance_columns()?;
831 Ok(Self {
832 root_reader,
833 records_left: num_records,
834 })
835 }
836}
837
838impl Iterator for ReaderIter {
839 type Item = Result<Row>;
840
841 fn next(&mut self) -> Option<Result<Row>> {
842 if self.records_left > 0 {
843 self.records_left -= 1;
844 Some(self.root_reader.read())
845 } else {
846 None
847 }
848 }
849}
850
851#[cfg(test)]
852mod tests {
853 use super::*;
854
855 use crate::data_type::Int64Type;
856 use crate::file::reader::SerializedFileReader;
857 use crate::file::writer::SerializedFileWriter;
858 use crate::record::api::RowAccessor;
859 use crate::schema::parser::parse_message_type;
860 use crate::util::test_common::file_util::{get_test_file, get_test_path};
861 use bytes::Bytes;
862
863 macro_rules! row {
866 ($($e:tt)*) => {
867 {
868 Row::new(vec![$($e)*])
869 }
870 }
871 }
872
873 macro_rules! list {
874 ($($e:tt)*) => {
875 {
876 Field::ListInternal(make_list(vec![$($e)*]))
877 }
878 }
879 }
880
881 macro_rules! map {
882 ($($e:tt)*) => {
883 {
884 Field::MapInternal(make_map(vec![$($e)*]))
885 }
886 }
887 }
888
889 macro_rules! group {
890 ( $( $e:expr ), * ) => {
891 {
892 Field::Group(row!($( $e ), *))
893 }
894 }
895 }
896
897 #[test]
898 fn test_file_reader_rows_nulls() {
899 let rows = test_file_reader_rows("nulls.snappy.parquet", None).unwrap();
900 let expected_rows = vec![
901 row![(
902 "b_struct".to_string(),
903 group![("b_c_int".to_string(), Field::Null)]
904 )],
905 row![(
906 "b_struct".to_string(),
907 group![("b_c_int".to_string(), Field::Null)]
908 )],
909 row![(
910 "b_struct".to_string(),
911 group![("b_c_int".to_string(), Field::Null)]
912 )],
913 row![(
914 "b_struct".to_string(),
915 group![("b_c_int".to_string(), Field::Null)]
916 )],
917 row![(
918 "b_struct".to_string(),
919 group![("b_c_int".to_string(), Field::Null)]
920 )],
921 row![(
922 "b_struct".to_string(),
923 group![("b_c_int".to_string(), Field::Null)]
924 )],
925 row![(
926 "b_struct".to_string(),
927 group![("b_c_int".to_string(), Field::Null)]
928 )],
929 row![(
930 "b_struct".to_string(),
931 group![("b_c_int".to_string(), Field::Null)]
932 )],
933 ];
934 assert_eq!(rows, expected_rows);
935 }
936
937 #[test]
938 fn test_file_reader_rows_nonnullable() {
939 let rows = test_file_reader_rows("nonnullable.impala.parquet", None).unwrap();
940 let expected_rows = vec![row![
941 ("ID".to_string(), Field::Long(8)),
942 ("Int_Array".to_string(), list![Field::Int(-1)]),
943 (
944 "int_array_array".to_string(),
945 list![list![Field::Int(-1), Field::Int(-2)], list![]]
946 ),
947 (
948 "Int_Map".to_string(),
949 map![(Field::Str("k1".to_string()), Field::Int(-1))]
950 ),
951 (
952 "int_map_array".to_string(),
953 list![
954 map![],
955 map![(Field::Str("k1".to_string()), Field::Int(1))],
956 map![],
957 map![]
958 ]
959 ),
960 (
961 "nested_Struct".to_string(),
962 group![
963 ("a".to_string(), Field::Int(-1)),
964 ("B".to_string(), list![Field::Int(-1)]),
965 (
966 "c".to_string(),
967 group![(
968 "D".to_string(),
969 list![list![group![
970 ("e".to_string(), Field::Int(-1)),
971 ("f".to_string(), Field::Str("nonnullable".to_string()))
972 ]]]
973 )]
974 ),
975 ("G".to_string(), map![])
976 ]
977 )
978 ]];
979 assert_eq!(rows, expected_rows);
980 }
981
982 #[test]
983 fn test_file_reader_rows_nullable() {
984 let rows = test_file_reader_rows("nullable.impala.parquet", None).unwrap();
985 let expected_rows = vec![
986 row![
987 ("id".to_string(), Field::Long(1)),
988 (
989 "int_array".to_string(),
990 list![Field::Int(1), Field::Int(2), Field::Int(3)]
991 ),
992 (
993 "int_array_Array".to_string(),
994 list![
995 list![Field::Int(1), Field::Int(2)],
996 list![Field::Int(3), Field::Int(4)]
997 ]
998 ),
999 (
1000 "int_map".to_string(),
1001 map![
1002 (Field::Str("k1".to_string()), Field::Int(1)),
1003 (Field::Str("k2".to_string()), Field::Int(100))
1004 ]
1005 ),
1006 (
1007 "int_Map_Array".to_string(),
1008 list![map![(Field::Str("k1".to_string()), Field::Int(1))]]
1009 ),
1010 (
1011 "nested_struct".to_string(),
1012 group![
1013 ("A".to_string(), Field::Int(1)),
1014 ("b".to_string(), list![Field::Int(1)]),
1015 (
1016 "C".to_string(),
1017 group![(
1018 "d".to_string(),
1019 list![
1020 list![
1021 group![
1022 ("E".to_string(), Field::Int(10)),
1023 ("F".to_string(), Field::Str("aaa".to_string()))
1024 ],
1025 group![
1026 ("E".to_string(), Field::Int(-10)),
1027 ("F".to_string(), Field::Str("bbb".to_string()))
1028 ]
1029 ],
1030 list![group![
1031 ("E".to_string(), Field::Int(11)),
1032 ("F".to_string(), Field::Str("c".to_string()))
1033 ]]
1034 ]
1035 )]
1036 ),
1037 (
1038 "g".to_string(),
1039 map![(
1040 Field::Str("foo".to_string()),
1041 group![(
1042 "H".to_string(),
1043 group![("i".to_string(), list![Field::Double(1.1)])]
1044 )]
1045 )]
1046 )
1047 ]
1048 )
1049 ],
1050 row![
1051 ("id".to_string(), Field::Long(2)),
1052 (
1053 "int_array".to_string(),
1054 list![
1055 Field::Null,
1056 Field::Int(1),
1057 Field::Int(2),
1058 Field::Null,
1059 Field::Int(3),
1060 Field::Null
1061 ]
1062 ),
1063 (
1064 "int_array_Array".to_string(),
1065 list![
1066 list![Field::Null, Field::Int(1), Field::Int(2), Field::Null],
1067 list![Field::Int(3), Field::Null, Field::Int(4)],
1068 list![],
1069 Field::Null
1070 ]
1071 ),
1072 (
1073 "int_map".to_string(),
1074 map![
1075 (Field::Str("k1".to_string()), Field::Int(2)),
1076 (Field::Str("k2".to_string()), Field::Null)
1077 ]
1078 ),
1079 (
1080 "int_Map_Array".to_string(),
1081 list![
1082 map![
1083 (Field::Str("k3".to_string()), Field::Null),
1084 (Field::Str("k1".to_string()), Field::Int(1))
1085 ],
1086 Field::Null,
1087 map![]
1088 ]
1089 ),
1090 (
1091 "nested_struct".to_string(),
1092 group![
1093 ("A".to_string(), Field::Null),
1094 ("b".to_string(), list![Field::Null]),
1095 (
1096 "C".to_string(),
1097 group![(
1098 "d".to_string(),
1099 list![
1100 list![
1101 group![
1102 ("E".to_string(), Field::Null),
1103 ("F".to_string(), Field::Null)
1104 ],
1105 group![
1106 ("E".to_string(), Field::Int(10)),
1107 ("F".to_string(), Field::Str("aaa".to_string()))
1108 ],
1109 group![
1110 ("E".to_string(), Field::Null),
1111 ("F".to_string(), Field::Null)
1112 ],
1113 group![
1114 ("E".to_string(), Field::Int(-10)),
1115 ("F".to_string(), Field::Str("bbb".to_string()))
1116 ],
1117 group![
1118 ("E".to_string(), Field::Null),
1119 ("F".to_string(), Field::Null)
1120 ]
1121 ],
1122 list![
1123 group![
1124 ("E".to_string(), Field::Int(11)),
1125 ("F".to_string(), Field::Str("c".to_string()))
1126 ],
1127 Field::Null
1128 ],
1129 list![],
1130 Field::Null
1131 ]
1132 )]
1133 ),
1134 (
1135 "g".to_string(),
1136 map![
1137 (
1138 Field::Str("g1".to_string()),
1139 group![(
1140 "H".to_string(),
1141 group![(
1142 "i".to_string(),
1143 list![Field::Double(2.2), Field::Null]
1144 )]
1145 )]
1146 ),
1147 (
1148 Field::Str("g2".to_string()),
1149 group![("H".to_string(), group![("i".to_string(), list![])])]
1150 ),
1151 (Field::Str("g3".to_string()), Field::Null),
1152 (
1153 Field::Str("g4".to_string()),
1154 group![(
1155 "H".to_string(),
1156 group![("i".to_string(), Field::Null)]
1157 )]
1158 ),
1159 (
1160 Field::Str("g5".to_string()),
1161 group![("H".to_string(), Field::Null)]
1162 )
1163 ]
1164 )
1165 ]
1166 )
1167 ],
1168 row![
1169 ("id".to_string(), Field::Long(3)),
1170 ("int_array".to_string(), list![]),
1171 ("int_array_Array".to_string(), list![Field::Null]),
1172 ("int_map".to_string(), map![]),
1173 ("int_Map_Array".to_string(), list![Field::Null, Field::Null]),
1174 (
1175 "nested_struct".to_string(),
1176 group![
1177 ("A".to_string(), Field::Null),
1178 ("b".to_string(), Field::Null),
1179 ("C".to_string(), group![("d".to_string(), list![])]),
1180 ("g".to_string(), map![])
1181 ]
1182 )
1183 ],
1184 row![
1185 ("id".to_string(), Field::Long(4)),
1186 ("int_array".to_string(), Field::Null),
1187 ("int_array_Array".to_string(), list![]),
1188 ("int_map".to_string(), map![]),
1189 ("int_Map_Array".to_string(), list![]),
1190 (
1191 "nested_struct".to_string(),
1192 group![
1193 ("A".to_string(), Field::Null),
1194 ("b".to_string(), Field::Null),
1195 ("C".to_string(), group![("d".to_string(), Field::Null)]),
1196 ("g".to_string(), Field::Null)
1197 ]
1198 )
1199 ],
1200 row![
1201 ("id".to_string(), Field::Long(5)),
1202 ("int_array".to_string(), Field::Null),
1203 ("int_array_Array".to_string(), Field::Null),
1204 ("int_map".to_string(), map![]),
1205 ("int_Map_Array".to_string(), Field::Null),
1206 (
1207 "nested_struct".to_string(),
1208 group![
1209 ("A".to_string(), Field::Null),
1210 ("b".to_string(), Field::Null),
1211 ("C".to_string(), Field::Null),
1212 (
1213 "g".to_string(),
1214 map![(
1215 Field::Str("foo".to_string()),
1216 group![(
1217 "H".to_string(),
1218 group![(
1219 "i".to_string(),
1220 list![Field::Double(2.2), Field::Double(3.3)]
1221 )]
1222 )]
1223 )]
1224 )
1225 ]
1226 )
1227 ],
1228 row![
1229 ("id".to_string(), Field::Long(6)),
1230 ("int_array".to_string(), Field::Null),
1231 ("int_array_Array".to_string(), Field::Null),
1232 ("int_map".to_string(), Field::Null),
1233 ("int_Map_Array".to_string(), Field::Null),
1234 ("nested_struct".to_string(), Field::Null)
1235 ],
1236 row![
1237 ("id".to_string(), Field::Long(7)),
1238 ("int_array".to_string(), Field::Null),
1239 (
1240 "int_array_Array".to_string(),
1241 list![Field::Null, list![Field::Int(5), Field::Int(6)]]
1242 ),
1243 (
1244 "int_map".to_string(),
1245 map![
1246 (Field::Str("k1".to_string()), Field::Null),
1247 (Field::Str("k3".to_string()), Field::Null)
1248 ]
1249 ),
1250 ("int_Map_Array".to_string(), Field::Null),
1251 (
1252 "nested_struct".to_string(),
1253 group![
1254 ("A".to_string(), Field::Int(7)),
1255 (
1256 "b".to_string(),
1257 list![Field::Int(2), Field::Int(3), Field::Null]
1258 ),
1259 (
1260 "C".to_string(),
1261 group![(
1262 "d".to_string(),
1263 list![list![], list![Field::Null], Field::Null]
1264 )]
1265 ),
1266 ("g".to_string(), Field::Null)
1267 ]
1268 )
1269 ],
1270 ];
1271 assert_eq!(rows, expected_rows);
1272 }
1273
1274 #[test]
1275 fn test_file_reader_rows_projection() {
1276 let schema = "
1277 message spark_schema {
1278 REQUIRED DOUBLE c;
1279 REQUIRED INT32 b;
1280 }
1281 ";
1282 let schema = parse_message_type(schema).unwrap();
1283 let rows = test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
1284 let expected_rows = vec![
1285 row![
1286 ("c".to_string(), Field::Double(1.0)),
1287 ("b".to_string(), Field::Int(1))
1288 ],
1289 row![
1290 ("c".to_string(), Field::Double(1.0)),
1291 ("b".to_string(), Field::Int(1))
1292 ],
1293 row![
1294 ("c".to_string(), Field::Double(1.0)),
1295 ("b".to_string(), Field::Int(1))
1296 ],
1297 row![
1298 ("c".to_string(), Field::Double(1.0)),
1299 ("b".to_string(), Field::Int(1))
1300 ],
1301 row![
1302 ("c".to_string(), Field::Double(1.0)),
1303 ("b".to_string(), Field::Int(1))
1304 ],
1305 row![
1306 ("c".to_string(), Field::Double(1.0)),
1307 ("b".to_string(), Field::Int(1))
1308 ],
1309 ];
1310 assert_eq!(rows, expected_rows);
1311 }
1312
1313 #[test]
1314 fn test_iter_columns_in_row() {
1315 let r = row![
1316 ("c".to_string(), Field::Double(1.0)),
1317 ("b".to_string(), Field::Int(1))
1318 ];
1319 let mut result = Vec::new();
1320 for (name, record) in r.get_column_iter() {
1321 result.push((name, record));
1322 }
1323 assert_eq!(
1324 vec![
1325 (&"c".to_string(), &Field::Double(1.0)),
1326 (&"b".to_string(), &Field::Int(1))
1327 ],
1328 result
1329 );
1330 }
1331
1332 #[test]
1333 fn test_into_columns_in_row() {
1334 let r = row![
1335 ("a".to_string(), Field::Str("My string".to_owned())),
1336 ("b".to_string(), Field::Int(1))
1337 ];
1338 assert_eq!(
1339 r.into_columns(),
1340 vec![
1341 ("a".to_string(), Field::Str("My string".to_owned())),
1342 ("b".to_string(), Field::Int(1)),
1343 ]
1344 );
1345 }
1346
1347 #[test]
1348 fn test_file_reader_rows_projection_map() {
1349 let schema = "
1350 message spark_schema {
1351 OPTIONAL group a (MAP) {
1352 REPEATED group key_value {
1353 REQUIRED BYTE_ARRAY key (UTF8);
1354 OPTIONAL group value (MAP) {
1355 REPEATED group key_value {
1356 REQUIRED INT32 key;
1357 REQUIRED BOOLEAN value;
1358 }
1359 }
1360 }
1361 }
1362 }
1363 ";
1364 let schema = parse_message_type(schema).unwrap();
1365 let rows = test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
1366 let expected_rows = vec![
1367 row![(
1368 "a".to_string(),
1369 map![(
1370 Field::Str("a".to_string()),
1371 map![
1372 (Field::Int(1), Field::Bool(true)),
1373 (Field::Int(2), Field::Bool(false))
1374 ]
1375 )]
1376 )],
1377 row![(
1378 "a".to_string(),
1379 map![(
1380 Field::Str("b".to_string()),
1381 map![(Field::Int(1), Field::Bool(true))]
1382 )]
1383 )],
1384 row![(
1385 "a".to_string(),
1386 map![(Field::Str("c".to_string()), Field::Null)]
1387 )],
1388 row![("a".to_string(), map![(Field::Str("d".to_string()), map![])])],
1389 row![(
1390 "a".to_string(),
1391 map![(
1392 Field::Str("e".to_string()),
1393 map![(Field::Int(1), Field::Bool(true))]
1394 )]
1395 )],
1396 row![(
1397 "a".to_string(),
1398 map![(
1399 Field::Str("f".to_string()),
1400 map![
1401 (Field::Int(3), Field::Bool(true)),
1402 (Field::Int(4), Field::Bool(false)),
1403 (Field::Int(5), Field::Bool(true))
1404 ]
1405 )]
1406 )],
1407 ];
1408 assert_eq!(rows, expected_rows);
1409 }
1410
1411 #[test]
1412 fn test_file_reader_rows_projection_list() {
1413 let schema = "
1414 message spark_schema {
1415 OPTIONAL group a (LIST) {
1416 REPEATED group list {
1417 OPTIONAL group element (LIST) {
1418 REPEATED group list {
1419 OPTIONAL group element (LIST) {
1420 REPEATED group list {
1421 OPTIONAL BYTE_ARRAY element (UTF8);
1422 }
1423 }
1424 }
1425 }
1426 }
1427 }
1428 }
1429 ";
1430 let schema = parse_message_type(schema).unwrap();
1431 let rows = test_file_reader_rows("nested_lists.snappy.parquet", Some(schema)).unwrap();
1432 let expected_rows = vec![
1433 row![(
1434 "a".to_string(),
1435 list![
1436 list![
1437 list![Field::Str("a".to_string()), Field::Str("b".to_string())],
1438 list![Field::Str("c".to_string())]
1439 ],
1440 list![Field::Null, list![Field::Str("d".to_string())]]
1441 ]
1442 )],
1443 row![(
1444 "a".to_string(),
1445 list![
1446 list![
1447 list![Field::Str("a".to_string()), Field::Str("b".to_string())],
1448 list![Field::Str("c".to_string()), Field::Str("d".to_string())]
1449 ],
1450 list![Field::Null, list![Field::Str("e".to_string())]]
1451 ]
1452 )],
1453 row![(
1454 "a".to_string(),
1455 list![
1456 list![
1457 list![Field::Str("a".to_string()), Field::Str("b".to_string())],
1458 list![Field::Str("c".to_string()), Field::Str("d".to_string())],
1459 list![Field::Str("e".to_string())]
1460 ],
1461 list![Field::Null, list![Field::Str("f".to_string())]]
1462 ]
1463 )],
1464 ];
1465 assert_eq!(rows, expected_rows);
1466 }
1467
1468 #[test]
1469 fn test_file_reader_rows_invalid_projection() {
1470 let schema = "
1471 message spark_schema {
1472 REQUIRED INT32 key;
1473 REQUIRED BOOLEAN value;
1474 }
1475 ";
1476 let schema = parse_message_type(schema).unwrap();
1477 let res = test_file_reader_rows("nested_maps.snappy.parquet", Some(schema));
1478 assert_eq!(
1479 res.unwrap_err().to_string(),
1480 "Parquet error: Root schema does not contain projection"
1481 );
1482 }
1483
1484 #[test]
1485 fn test_row_group_rows_invalid_projection() {
1486 let schema = "
1487 message spark_schema {
1488 REQUIRED INT32 key;
1489 REQUIRED BOOLEAN value;
1490 }
1491 ";
1492 let schema = parse_message_type(schema).unwrap();
1493 let res = test_row_group_rows("nested_maps.snappy.parquet", Some(schema));
1494 assert_eq!(
1495 res.unwrap_err().to_string(),
1496 "Parquet error: Root schema does not contain projection"
1497 );
1498 }
1499
1500 #[test]
1501 fn test_file_reader_rows_nested_map_type() {
1502 let schema = "
1503 message spark_schema {
1504 OPTIONAL group a (MAP) {
1505 REPEATED group key_value {
1506 REQUIRED BYTE_ARRAY key (UTF8);
1507 OPTIONAL group value (MAP) {
1508 REPEATED group key_value {
1509 REQUIRED INT32 key;
1510 }
1511 }
1512 }
1513 }
1514 }
1515 ";
1516 let schema = parse_message_type(schema).unwrap();
1517 test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
1518 }
1519
1520 #[test]
1521 fn test_file_reader_iter() {
1522 let path = get_test_path("alltypes_plain.parquet");
1523 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1524 let iter = RowIter::from_file_into(Box::new(reader));
1525
1526 let values: Vec<_> = iter.flat_map(|r| r.unwrap().get_int(0)).collect();
1527 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1528 }
1529
1530 #[test]
1531 fn test_file_reader_iter_projection() {
1532 let path = get_test_path("alltypes_plain.parquet");
1533 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1534 let schema = "message schema { OPTIONAL INT32 id; }";
1535 let proj = parse_message_type(schema).ok();
1536
1537 let iter = RowIter::from_file_into(Box::new(reader))
1538 .project(proj)
1539 .unwrap();
1540 let values: Vec<_> = iter.flat_map(|r| r.unwrap().get_int(0)).collect();
1541
1542 assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1543 }
1544
1545 #[test]
1546 fn test_file_reader_iter_projection_err() {
1547 let schema = "
1548 message spark_schema {
1549 REQUIRED INT32 key;
1550 REQUIRED BOOLEAN value;
1551 }
1552 ";
1553 let proj = parse_message_type(schema).ok();
1554 let path = get_test_path("nested_maps.snappy.parquet");
1555 let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1556 let res = RowIter::from_file_into(Box::new(reader)).project(proj);
1557
1558 assert_eq!(
1559 res.err().unwrap().to_string(),
1560 "Parquet error: Root schema does not contain projection"
1561 );
1562 }
1563
1564 #[test]
1565 fn test_tree_reader_handle_repeated_fields_with_no_annotation() {
1566 let rows = test_file_reader_rows("repeated_no_annotation.parquet", None).unwrap();
1569 let expected_rows = vec![
1570 row![
1571 ("id".to_string(), Field::Int(1)),
1572 ("phoneNumbers".to_string(), Field::Null)
1573 ],
1574 row![
1575 ("id".to_string(), Field::Int(2)),
1576 ("phoneNumbers".to_string(), Field::Null)
1577 ],
1578 row![
1579 ("id".to_string(), Field::Int(3)),
1580 (
1581 "phoneNumbers".to_string(),
1582 group![("phone".to_string(), list![])]
1583 )
1584 ],
1585 row![
1586 ("id".to_string(), Field::Int(4)),
1587 (
1588 "phoneNumbers".to_string(),
1589 group![(
1590 "phone".to_string(),
1591 list![group![
1592 ("number".to_string(), Field::Long(5555555555)),
1593 ("kind".to_string(), Field::Null)
1594 ]]
1595 )]
1596 )
1597 ],
1598 row![
1599 ("id".to_string(), Field::Int(5)),
1600 (
1601 "phoneNumbers".to_string(),
1602 group![(
1603 "phone".to_string(),
1604 list![group![
1605 ("number".to_string(), Field::Long(1111111111)),
1606 ("kind".to_string(), Field::Str("home".to_string()))
1607 ]]
1608 )]
1609 )
1610 ],
1611 row![
1612 ("id".to_string(), Field::Int(6)),
1613 (
1614 "phoneNumbers".to_string(),
1615 group![(
1616 "phone".to_string(),
1617 list![
1618 group![
1619 ("number".to_string(), Field::Long(1111111111)),
1620 ("kind".to_string(), Field::Str("home".to_string()))
1621 ],
1622 group![
1623 ("number".to_string(), Field::Long(2222222222)),
1624 ("kind".to_string(), Field::Null)
1625 ],
1626 group![
1627 ("number".to_string(), Field::Long(3333333333)),
1628 ("kind".to_string(), Field::Str("mobile".to_string()))
1629 ]
1630 ]
1631 )]
1632 )
1633 ],
1634 ];
1635
1636 assert_eq!(rows, expected_rows);
1637 }
1638
1639 #[test]
1640 fn test_tree_reader_handle_nested_repeated_fields_with_no_annotation() {
1641 let schema = Arc::new(
1643 parse_message_type(
1644 "
1645 message schema {
1646 REPEATED group level1 {
1647 REPEATED group level2 {
1648 REQUIRED group level3 {
1649 REQUIRED INT64 value3;
1650 }
1651 }
1652 REQUIRED INT64 value1;
1653 }
1654 }",
1655 )
1656 .unwrap(),
1657 );
1658
1659 let mut buffer: Vec<u8> = Vec::new();
1661 let mut file_writer =
1662 SerializedFileWriter::new(&mut buffer, schema, Default::default()).unwrap();
1663 let mut row_group_writer = file_writer.next_row_group().unwrap();
1664
1665 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
1667 column_writer
1668 .typed::<Int64Type>()
1669 .write_batch(&[30, 31, 32], Some(&[2, 2, 2]), Some(&[0, 0, 0]))
1670 .unwrap();
1671 column_writer.close().unwrap();
1672
1673 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
1675 column_writer
1676 .typed::<Int64Type>()
1677 .write_batch(&[10, 11, 12], Some(&[1, 1, 1]), Some(&[0, 0, 0]))
1678 .unwrap();
1679 column_writer.close().unwrap();
1680
1681 row_group_writer.close().unwrap();
1683 file_writer.close().unwrap();
1684 assert_eq!(&buffer[0..4], b"PAR1");
1685
1686 let file_reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
1688 let rows: Vec<_> = file_reader
1689 .get_row_iter(None)
1690 .unwrap()
1691 .map(|row| row.unwrap())
1692 .collect();
1693
1694 let expected_rows = vec![
1695 row![(
1696 "level1".to_string(),
1697 list![group![
1698 (
1699 "level2".to_string(),
1700 list![group![(
1701 "level3".to_string(),
1702 group![("value3".to_string(), Field::Long(30))]
1703 )]]
1704 ),
1705 ("value1".to_string(), Field::Long(10))
1706 ]]
1707 )],
1708 row![(
1709 "level1".to_string(),
1710 list![group![
1711 (
1712 "level2".to_string(),
1713 list![group![(
1714 "level3".to_string(),
1715 group![("value3".to_string(), Field::Long(31))]
1716 )]]
1717 ),
1718 ("value1".to_string(), Field::Long(11))
1719 ]]
1720 )],
1721 row![(
1722 "level1".to_string(),
1723 list![group![
1724 (
1725 "level2".to_string(),
1726 list![group![(
1727 "level3".to_string(),
1728 group![("value3".to_string(), Field::Long(32))]
1729 )]]
1730 ),
1731 ("value1".to_string(), Field::Long(12))
1732 ]]
1733 )],
1734 ];
1735
1736 assert_eq!(rows, expected_rows);
1737 }
1738
1739 #[test]
1740 fn test_tree_reader_handle_primitive_repeated_fields_with_no_annotation() {
1741 let rows = test_file_reader_rows("repeated_primitive_no_list.parquet", None).unwrap();
1743 let expected_rows = vec![
1744 row![
1745 (
1746 "Int32_list".to_string(),
1747 Field::ListInternal(make_list([0, 1, 2, 3].map(Field::Int).to_vec()))
1748 ),
1749 (
1750 "String_list".to_string(),
1751 Field::ListInternal(make_list(
1752 ["foo", "zero", "one", "two"]
1753 .map(|s| Field::Str(s.to_string()))
1754 .to_vec()
1755 ))
1756 ),
1757 (
1758 "group_of_lists".to_string(),
1759 group![
1760 (
1761 "Int32_list_in_group".to_string(),
1762 Field::ListInternal(make_list([0, 1, 2, 3].map(Field::Int).to_vec()))
1763 ),
1764 (
1765 "String_list_in_group".to_string(),
1766 Field::ListInternal(make_list(
1767 ["foo", "zero", "one", "two"]
1768 .map(|s| Field::Str(s.to_string()))
1769 .to_vec()
1770 ))
1771 )
1772 ]
1773 )
1774 ],
1775 row![
1776 (
1777 "Int32_list".to_string(),
1778 Field::ListInternal(make_list(vec![]))
1779 ),
1780 (
1781 "String_list".to_string(),
1782 Field::ListInternal(make_list(
1783 ["three"].map(|s| Field::Str(s.to_string())).to_vec()
1784 ))
1785 ),
1786 (
1787 "group_of_lists".to_string(),
1788 group![
1789 (
1790 "Int32_list_in_group".to_string(),
1791 Field::ListInternal(make_list(vec![]))
1792 ),
1793 (
1794 "String_list_in_group".to_string(),
1795 Field::ListInternal(make_list(
1796 ["three"].map(|s| Field::Str(s.to_string())).to_vec()
1797 ))
1798 )
1799 ]
1800 )
1801 ],
1802 row![
1803 (
1804 "Int32_list".to_string(),
1805 Field::ListInternal(make_list(vec![Field::Int(4)]))
1806 ),
1807 (
1808 "String_list".to_string(),
1809 Field::ListInternal(make_list(
1810 ["four"].map(|s| Field::Str(s.to_string())).to_vec()
1811 ))
1812 ),
1813 (
1814 "group_of_lists".to_string(),
1815 group![
1816 (
1817 "Int32_list_in_group".to_string(),
1818 Field::ListInternal(make_list(vec![Field::Int(4)]))
1819 ),
1820 (
1821 "String_list_in_group".to_string(),
1822 Field::ListInternal(make_list(
1823 ["four"].map(|s| Field::Str(s.to_string())).to_vec()
1824 ))
1825 )
1826 ]
1827 )
1828 ],
1829 row![
1830 (
1831 "Int32_list".to_string(),
1832 Field::ListInternal(make_list([5, 6, 7, 8].map(Field::Int).to_vec()))
1833 ),
1834 (
1835 "String_list".to_string(),
1836 Field::ListInternal(make_list(
1837 ["five", "six", "seven", "eight"]
1838 .map(|s| Field::Str(s.to_string()))
1839 .to_vec()
1840 ))
1841 ),
1842 (
1843 "group_of_lists".to_string(),
1844 group![
1845 (
1846 "Int32_list_in_group".to_string(),
1847 Field::ListInternal(make_list([5, 6, 7, 8].map(Field::Int).to_vec()))
1848 ),
1849 (
1850 "String_list_in_group".to_string(),
1851 Field::ListInternal(make_list(
1852 ["five", "six", "seven", "eight"]
1853 .map(|s| Field::Str(s.to_string()))
1854 .to_vec()
1855 ))
1856 )
1857 ]
1858 )
1859 ],
1860 ];
1861 assert_eq!(rows, expected_rows);
1862 }
1863
1864 #[test]
1865 fn test_map_no_value() {
1866 let rows = test_file_reader_rows("map_no_value.parquet", None).unwrap();
1886
1887 for row in rows {
1889 let cols = row.into_columns();
1890 assert_eq!(cols[1].1, cols[2].1);
1891 }
1892 }
1893
1894 fn test_file_reader_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> {
1895 let file = get_test_file(file_name);
1896 let file_reader: Box<dyn FileReader> = Box::new(SerializedFileReader::new(file)?);
1897 let iter = file_reader.get_row_iter(schema)?;
1898 Ok(iter.map(|row| row.unwrap()).collect())
1899 }
1900
1901 fn test_row_group_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> {
1902 let file = get_test_file(file_name);
1903 let file_reader: Box<dyn FileReader> = Box::new(SerializedFileReader::new(file)?);
1904 let row_group_reader = file_reader.get_row_group(0).unwrap();
1907 let iter = row_group_reader.get_row_iter(schema)?;
1908 Ok(iter.map(|row| row.unwrap()).collect())
1909 }
1910
1911 #[test]
1912 fn test_read_old_nested_list() {
1913 let rows = test_file_reader_rows("old_list_structure.parquet", None).unwrap();
1914 let expected_rows = vec![row![(
1915 "a".to_string(),
1916 Field::ListInternal(make_list(
1917 [
1918 make_list([1, 2].map(Field::Int).to_vec()),
1919 make_list([3, 4].map(Field::Int).to_vec())
1920 ]
1921 .map(Field::ListInternal)
1922 .to_vec()
1923 ))
1924 ),]];
1925 assert_eq!(rows, expected_rows);
1926 }
1927
1928 fn assert_err_on_overcount(file_name: &str, proj_schema: Option<Type>) {
1929 let file = get_test_file(file_name);
1930 let file_reader = SerializedFileReader::new(file).unwrap();
1931 let metadata = file_reader.metadata();
1932 let row_group_reader = file_reader.get_row_group(0).unwrap();
1933 let actual_rows = row_group_reader.metadata().num_rows() as usize;
1934
1935 let descr = match proj_schema {
1936 Some(schema) => Arc::new(SchemaDescriptor::new(Arc::new(schema))),
1937 None => metadata.file_metadata().schema_descr_ptr(),
1938 };
1939 let reader = TreeBuilder::new().build(descr, &*row_group_reader).unwrap();
1940 let iter = ReaderIter::new(reader, actual_rows + 1).unwrap();
1941
1942 let rows: Vec<Result<Row>> = iter.collect();
1943 assert_eq!(rows.len(), actual_rows + 1);
1944 for row in &rows[..actual_rows] {
1945 assert!(row.is_ok(), "Expected Ok row, got: {:?}", row);
1946 }
1947 let err = rows[actual_rows].as_ref().unwrap_err();
1948 assert!(
1949 err.to_string().contains("Unexpected end of column data"),
1950 "Unexpected error message: {}",
1951 err
1952 );
1953 }
1954
1955 #[test]
1956 fn test_reader_iter_returns_error_when_num_records_exceeds_data() {
1957 assert_err_on_overcount("nulls.snappy.parquet", None);
1958 }
1959
1960 #[test]
1961 fn test_reader_iter_returns_error_for_repeated_field_when_num_records_exceeds_data() {
1962 assert_err_on_overcount("repeated_primitive_no_list.parquet", None);
1963 }
1964
1965 #[test]
1966 fn test_reader_iter_returns_error_for_map_field_when_num_records_exceeds_data() {
1967 let schema = parse_message_type(
1968 "message schema {
1969 REQUIRED group my_map (MAP) {
1970 REPEATED group key_value {
1971 REQUIRED INT32 key;
1972 OPTIONAL INT32 value;
1973 }
1974 }
1975 }",
1976 )
1977 .unwrap();
1978 assert_err_on_overcount("map_no_value.parquet", Some(schema));
1979 }
1980}