parquet/record/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains implementation of record assembly and converting Parquet types into
19//! [`Row`]s.
20
21use std::{collections::HashMap, fmt, sync::Arc};
22
23use crate::basic::{ConvertedType, Repetition};
24use crate::errors::{ParquetError, Result};
25use crate::file::reader::{FileReader, RowGroupReader};
26use crate::record::{
27    api::{make_list, make_map, Field, Row},
28    triplet::TripletIter,
29};
30use crate::schema::types::{ColumnPath, SchemaDescPtr, SchemaDescriptor, Type, TypePtr};
31
32/// Default batch size for a reader
33const DEFAULT_BATCH_SIZE: usize = 1024;
34
35/// Tree builder for `Reader` enum.
36/// Serves as a container of options for building a reader tree and a builder, and
37/// accessing a records iterator [`RowIter`].
38pub struct TreeBuilder {
39    // Batch size (>= 1) for triplet iterators
40    batch_size: usize,
41}
42
43impl Default for TreeBuilder {
44    fn default() -> Self {
45        Self::new()
46    }
47}
48
49impl TreeBuilder {
50    /// Creates new tree builder with default parameters.
51    pub fn new() -> Self {
52        Self {
53            batch_size: DEFAULT_BATCH_SIZE,
54        }
55    }
56
57    /// Sets batch size for this tree builder.
58    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
59        self.batch_size = batch_size;
60        self
61    }
62
63    /// Creates new root reader for provided schema and row group.
64    pub fn build(
65        &self,
66        descr: SchemaDescPtr,
67        row_group_reader: &dyn RowGroupReader,
68    ) -> Result<Reader> {
69        // Prepare lookup table of column path -> original column index
70        // This allows to prune columns and map schema leaf nodes to the column readers
71        let mut paths: HashMap<ColumnPath, usize> = HashMap::new();
72        let row_group_metadata = row_group_reader.metadata();
73
74        for col_index in 0..row_group_reader.num_columns() {
75            let col_meta = row_group_metadata.column(col_index);
76            let col_path = col_meta.column_path().clone();
77            paths.insert(col_path, col_index);
78        }
79
80        // Build child readers for the message type
81        let mut readers = Vec::new();
82        let mut path = Vec::new();
83
84        for field in descr.root_schema().get_fields() {
85            let reader =
86                self.reader_tree(field.clone(), &mut path, 0, 0, &paths, row_group_reader)?;
87            readers.push(reader);
88        }
89
90        // Return group reader for message type,
91        // it is always required with definition level 0
92        Ok(Reader::GroupReader(None, 0, readers))
93    }
94
95    /// Creates iterator of `Row`s directly from schema descriptor and row group.
96    pub fn as_iter(
97        &self,
98        descr: SchemaDescPtr,
99        row_group_reader: &dyn RowGroupReader,
100    ) -> Result<ReaderIter> {
101        let num_records = row_group_reader.metadata().num_rows() as usize;
102        ReaderIter::new(self.build(descr, row_group_reader)?, num_records)
103    }
104
105    /// Builds tree of readers for the current schema recursively.
106    fn reader_tree(
107        &self,
108        field: TypePtr,
109        path: &mut Vec<String>,
110        mut curr_def_level: i16,
111        mut curr_rep_level: i16,
112        paths: &HashMap<ColumnPath, usize>,
113        row_group_reader: &dyn RowGroupReader,
114    ) -> Result<Reader> {
115        assert!(field.get_basic_info().has_repetition());
116        // Update current definition and repetition levels for this type
117        let repetition = field.get_basic_info().repetition();
118        match repetition {
119            Repetition::OPTIONAL => {
120                curr_def_level += 1;
121            }
122            Repetition::REPEATED => {
123                curr_def_level += 1;
124                curr_rep_level += 1;
125            }
126            _ => {}
127        }
128
129        path.push(String::from(field.name()));
130        let reader = if field.is_primitive() {
131            let col_path = ColumnPath::new(path.to_vec());
132            let orig_index = *paths
133                .get(&col_path)
134                .ok_or(general_err!("Path {:?} not found", col_path))?;
135            let col_descr = row_group_reader
136                .metadata()
137                .column(orig_index)
138                .column_descr_ptr();
139            let col_reader = row_group_reader.get_column_reader(orig_index)?;
140            let column = TripletIter::new(col_descr, col_reader, self.batch_size);
141            let reader = Reader::PrimitiveReader(field.clone(), Box::new(column));
142            if repetition == Repetition::REPEATED {
143                Reader::RepeatedReader(
144                    field,
145                    curr_def_level - 1,
146                    curr_rep_level - 1,
147                    Box::new(reader),
148                )
149            } else {
150                reader
151            }
152        } else {
153            match field.get_basic_info().converted_type() {
154                // List types
155                ConvertedType::LIST => {
156                    assert_eq!(field.get_fields().len(), 1, "Invalid list type {field:?}");
157
158                    let repeated_field = field.get_fields()[0].clone();
159                    assert_eq!(
160                        repeated_field.get_basic_info().repetition(),
161                        Repetition::REPEATED,
162                        "Invalid list type {field:?}"
163                    );
164
165                    if Reader::is_element_type(&repeated_field) {
166                        // Support for backward compatible lists
167                        let reader = self.reader_tree(
168                            repeated_field,
169                            path,
170                            curr_def_level,
171                            curr_rep_level,
172                            paths,
173                            row_group_reader,
174                        )?;
175
176                        Reader::RepeatedReader(
177                            field,
178                            curr_def_level,
179                            curr_rep_level,
180                            Box::new(reader),
181                        )
182                    } else {
183                        let child_field = repeated_field.get_fields()[0].clone();
184
185                        path.push(String::from(repeated_field.name()));
186
187                        let reader = self.reader_tree(
188                            child_field,
189                            path,
190                            curr_def_level + 1,
191                            curr_rep_level + 1,
192                            paths,
193                            row_group_reader,
194                        )?;
195
196                        path.pop();
197
198                        Reader::RepeatedReader(
199                            field,
200                            curr_def_level,
201                            curr_rep_level,
202                            Box::new(reader),
203                        )
204                    }
205                }
206                // Map types (key-value pairs)
207                ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
208                    assert_eq!(field.get_fields().len(), 1, "Invalid map type: {field:?}");
209                    assert!(
210                        !field.get_fields()[0].is_primitive(),
211                        "Invalid map type: {field:?}"
212                    );
213
214                    let key_value_type = field.get_fields()[0].clone();
215                    assert_eq!(
216                        key_value_type.get_basic_info().repetition(),
217                        Repetition::REPEATED,
218                        "Invalid map type: {field:?}"
219                    );
220                    // Parquet spec allows no value. In that case treat as a list. #1642
221                    if key_value_type.get_fields().len() != 1 {
222                        // If not a list, then there can only be 2 fields in the struct
223                        assert_eq!(
224                            key_value_type.get_fields().len(),
225                            2,
226                            "Invalid map type: {field:?}"
227                        );
228                    }
229
230                    path.push(String::from(key_value_type.name()));
231
232                    let key_type = &key_value_type.get_fields()[0];
233                    assert!(
234                        key_type.is_primitive(),
235                        "Map key type is expected to be a primitive type, but found {key_type:?}"
236                    );
237                    let key_reader = self.reader_tree(
238                        key_type.clone(),
239                        path,
240                        curr_def_level + 1,
241                        curr_rep_level + 1,
242                        paths,
243                        row_group_reader,
244                    )?;
245
246                    if key_value_type.get_fields().len() == 1 {
247                        path.pop();
248                        Reader::RepeatedReader(
249                            field,
250                            curr_def_level,
251                            curr_rep_level,
252                            Box::new(key_reader),
253                        )
254                    } else {
255                        let value_type = &key_value_type.get_fields()[1];
256                        let value_reader = self.reader_tree(
257                            value_type.clone(),
258                            path,
259                            curr_def_level + 1,
260                            curr_rep_level + 1,
261                            paths,
262                            row_group_reader,
263                        )?;
264
265                        path.pop();
266
267                        Reader::KeyValueReader(
268                            field,
269                            curr_def_level,
270                            curr_rep_level,
271                            Box::new(key_reader),
272                            Box::new(value_reader),
273                        )
274                    }
275                }
276                // A repeated field that is neither contained by a `LIST`- or
277                // `MAP`-annotated group nor annotated by `LIST` or `MAP`
278                // should be interpreted as a required list of required
279                // elements where the element type is the type of the field.
280                _ if repetition == Repetition::REPEATED => {
281                    let required_field = Type::group_type_builder(field.name())
282                        .with_repetition(Repetition::REQUIRED)
283                        .with_converted_type(field.get_basic_info().converted_type())
284                        .with_fields(field.get_fields().to_vec())
285                        .build()?;
286
287                    path.pop();
288
289                    let reader = self.reader_tree(
290                        Arc::new(required_field),
291                        path,
292                        curr_def_level,
293                        curr_rep_level,
294                        paths,
295                        row_group_reader,
296                    )?;
297
298                    return Ok(Reader::RepeatedReader(
299                        field,
300                        curr_def_level - 1,
301                        curr_rep_level - 1,
302                        Box::new(reader),
303                    ));
304                }
305                // Group types (structs)
306                _ => {
307                    let mut readers = Vec::new();
308                    for child in field.get_fields() {
309                        let reader = self.reader_tree(
310                            child.clone(),
311                            path,
312                            curr_def_level,
313                            curr_rep_level,
314                            paths,
315                            row_group_reader,
316                        )?;
317                        readers.push(reader);
318                    }
319                    Reader::GroupReader(Some(field), curr_def_level, readers)
320                }
321            }
322        };
323        path.pop();
324
325        Ok(Reader::option(repetition, curr_def_level, reader))
326    }
327}
328
329/// Reader tree for record assembly
330pub enum Reader {
331    /// Primitive reader with type information and triplet iterator
332    PrimitiveReader(TypePtr, Box<TripletIter>),
333    /// Optional reader with definition level of a parent and a reader
334    OptionReader(i16, Box<Reader>),
335    /// Group (struct) reader with type information, definition level and list of child
336    /// readers. When it represents message type, type information is None
337    GroupReader(Option<TypePtr>, i16, Vec<Reader>),
338    /// Reader for repeated values, e.g. lists, contains type information, definition
339    /// level, repetition level and a child reader
340    RepeatedReader(TypePtr, i16, i16, Box<Reader>),
341    /// Reader of key-value pairs, e.g. maps, contains type information, definition
342    /// level, repetition level, child reader for keys and child reader for values
343    KeyValueReader(TypePtr, i16, i16, Box<Reader>, Box<Reader>),
344}
345
346impl Reader {
347    /// Wraps reader in option reader based on repetition.
348    fn option(repetition: Repetition, def_level: i16, reader: Reader) -> Self {
349        if repetition == Repetition::OPTIONAL {
350            Reader::OptionReader(def_level - 1, Box::new(reader))
351        } else {
352            reader
353        }
354    }
355
356    /// Returns true if repeated type is an element type for the list.
357    /// Used to determine legacy list types.
358    /// This method is copied from Spark Parquet reader and is based on the reference:
359    /// <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
360    ///   #backward-compatibility-rules
361    fn is_element_type(repeated_type: &Type) -> bool {
362        // For legacy 2-level list types whose element type is a 2-level list
363        //
364        //    // ARRAY<ARRAY<INT>> (nullable list, non-null elements)
365        //    optional group my_list (LIST) {
366        //      repeated group array (LIST) {
367        //        repeated int32 array;
368        //      };
369        //    }
370        //
371        if repeated_type.is_list() || repeated_type.has_single_repeated_child() {
372            return false;
373        }
374
375        // For legacy 2-level list types with primitive element type, e.g.:
376        //
377        //    // ARRAY<INT> (nullable list, non-null elements)
378        //    optional group my_list (LIST) {
379        //      repeated int32 element;
380        //    }
381        //
382        repeated_type.is_primitive() ||
383    // For legacy 2-level list types whose element type is a group type with 2 or more
384    // fields, e.g.:
385    //
386    //    // ARRAY<STRUCT<str: STRING, num: INT>> (nullable list, non-null elements)
387    //    optional group my_list (LIST) {
388    //      repeated group element {
389    //        required binary str (UTF8);
390    //        required int32 num;
391    //      };
392    //    }
393    //
394    repeated_type.is_group() && repeated_type.get_fields().len() > 1 ||
395    // For legacy 2-level list types generated by parquet-avro (Parquet version < 1.6.0),
396    // e.g.:
397    //
398    //    // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements)
399    //    optional group my_list (LIST) {
400    //      repeated group array {
401    //        required binary str (UTF8);
402    //      };
403    //    }
404    //
405    repeated_type.name() == "array" ||
406    // For Parquet data generated by parquet-thrift, e.g.:
407    //
408    //    // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements)
409    //    optional group my_list (LIST) {
410    //      repeated group my_list_tuple {
411    //        required binary str (UTF8);
412    //      };
413    //    }
414    //
415    repeated_type.name().ends_with("_tuple")
416    }
417
418    /// Reads current record as `Row` from the reader tree.
419    /// Automatically advances all necessary readers.
420    /// This must be called on the root level reader (i.e., for Message type).
421    /// Otherwise, it will panic.
422    fn read(&mut self) -> Result<Row> {
423        match *self {
424            Reader::GroupReader(_, _, ref mut readers) => {
425                let mut fields = Vec::new();
426                for reader in readers {
427                    fields.push((String::from(reader.field_name()), reader.read_field()?));
428                }
429                Ok(Row::new(fields))
430            }
431            _ => panic!("Cannot call read() on {self}"),
432        }
433    }
434
435    /// Reads current record as `Field` from the reader tree.
436    /// Automatically advances all necessary readers.
437    fn read_field(&mut self) -> Result<Field> {
438        let field = match *self {
439            Reader::PrimitiveReader(_, ref mut column) => {
440                let value = column.current_value()?;
441                column.read_next()?;
442                value
443            }
444            Reader::OptionReader(def_level, ref mut reader) => {
445                if reader.current_def_level() > def_level {
446                    reader.read_field()?
447                } else {
448                    reader.advance_columns()?;
449                    Field::Null
450                }
451            }
452            Reader::GroupReader(_, def_level, ref mut readers) => {
453                let mut fields = Vec::new();
454                for reader in readers {
455                    if reader.repetition() != Repetition::OPTIONAL
456                        || reader.current_def_level() > def_level
457                    {
458                        fields.push((String::from(reader.field_name()), reader.read_field()?));
459                    } else {
460                        reader.advance_columns()?;
461                        fields.push((String::from(reader.field_name()), Field::Null));
462                    }
463                }
464                let row = Row::new(fields);
465                Field::Group(row)
466            }
467            Reader::RepeatedReader(_, def_level, rep_level, ref mut reader) => {
468                let mut elements = Vec::new();
469                loop {
470                    if reader.current_def_level() > def_level {
471                        elements.push(reader.read_field()?);
472                    } else {
473                        reader.advance_columns()?;
474                        // If the current definition level is equal to the definition
475                        // level of this repeated type, then the
476                        // result is an empty list and the repetition level
477                        // will always be <= rl.
478                        break;
479                    }
480
481                    // This covers case when we are out of repetition levels and should
482                    // close the group, or there are no values left to
483                    // buffer.
484                    if !reader.has_next() || reader.current_rep_level() <= rep_level {
485                        break;
486                    }
487                }
488                Field::ListInternal(make_list(elements))
489            }
490            Reader::KeyValueReader(_, def_level, rep_level, ref mut keys, ref mut values) => {
491                let mut pairs = Vec::new();
492                loop {
493                    if keys.current_def_level() > def_level {
494                        pairs.push((keys.read_field()?, values.read_field()?));
495                    } else {
496                        keys.advance_columns()?;
497                        values.advance_columns()?;
498                        // If the current definition level is equal to the definition
499                        // level of this repeated type, then the
500                        // result is an empty list and the repetition level
501                        // will always be <= rl.
502                        break;
503                    }
504
505                    // This covers case when we are out of repetition levels and should
506                    // close the group, or there are no values left to
507                    // buffer.
508                    if !keys.has_next() || keys.current_rep_level() <= rep_level {
509                        break;
510                    }
511                }
512
513                Field::MapInternal(make_map(pairs))
514            }
515        };
516        Ok(field)
517    }
518
519    /// Returns field name for the current reader.
520    fn field_name(&self) -> &str {
521        match *self {
522            Reader::PrimitiveReader(ref field, _) => field.name(),
523            Reader::OptionReader(_, ref reader) => reader.field_name(),
524            Reader::GroupReader(ref opt, ..) => match opt {
525                Some(ref field) => field.name(),
526                None => panic!("Field is None for group reader"),
527            },
528            Reader::RepeatedReader(ref field, ..) => field.name(),
529            Reader::KeyValueReader(ref field, ..) => field.name(),
530        }
531    }
532
533    /// Returns repetition for the current reader.
534    fn repetition(&self) -> Repetition {
535        match *self {
536            Reader::PrimitiveReader(ref field, _) => field.get_basic_info().repetition(),
537            Reader::OptionReader(_, ref reader) => reader.repetition(),
538            Reader::GroupReader(ref opt, ..) => match opt {
539                Some(ref field) => field.get_basic_info().repetition(),
540                None => panic!("Field is None for group reader"),
541            },
542            Reader::RepeatedReader(ref field, ..) => field.get_basic_info().repetition(),
543            Reader::KeyValueReader(ref field, ..) => field.get_basic_info().repetition(),
544        }
545    }
546
547    /// Returns true, if current reader has more values, false otherwise.
548    /// Method does not advance internal iterator.
549    fn has_next(&self) -> bool {
550        match *self {
551            Reader::PrimitiveReader(_, ref column) => column.has_next(),
552            Reader::OptionReader(_, ref reader) => reader.has_next(),
553            Reader::GroupReader(_, _, ref readers) => readers.first().unwrap().has_next(),
554            Reader::RepeatedReader(_, _, _, ref reader) => reader.has_next(),
555            Reader::KeyValueReader(_, _, _, ref keys, _) => keys.has_next(),
556        }
557    }
558
559    /// Returns current definition level,
560    /// Method does not advance internal iterator.
561    fn current_def_level(&self) -> i16 {
562        match *self {
563            Reader::PrimitiveReader(_, ref column) => column.current_def_level(),
564            Reader::OptionReader(_, ref reader) => reader.current_def_level(),
565            Reader::GroupReader(_, _, ref readers) => match readers.first() {
566                Some(reader) => reader.current_def_level(),
567                None => panic!("Current definition level: empty group reader"),
568            },
569            Reader::RepeatedReader(_, _, _, ref reader) => reader.current_def_level(),
570            Reader::KeyValueReader(_, _, _, ref keys, _) => keys.current_def_level(),
571        }
572    }
573
574    /// Returns current repetition level.
575    /// Method does not advance internal iterator.
576    fn current_rep_level(&self) -> i16 {
577        match *self {
578            Reader::PrimitiveReader(_, ref column) => column.current_rep_level(),
579            Reader::OptionReader(_, ref reader) => reader.current_rep_level(),
580            Reader::GroupReader(_, _, ref readers) => match readers.first() {
581                Some(reader) => reader.current_rep_level(),
582                None => panic!("Current repetition level: empty group reader"),
583            },
584            Reader::RepeatedReader(_, _, _, ref reader) => reader.current_rep_level(),
585            Reader::KeyValueReader(_, _, _, ref keys, _) => keys.current_rep_level(),
586        }
587    }
588
589    /// Advances leaf columns for the current reader.
590    fn advance_columns(&mut self) -> Result<()> {
591        match *self {
592            Reader::PrimitiveReader(_, ref mut column) => column.read_next().map(|_| ()),
593            Reader::OptionReader(_, ref mut reader) => reader.advance_columns(),
594            Reader::GroupReader(_, _, ref mut readers) => {
595                for reader in readers {
596                    reader.advance_columns()?;
597                }
598                Ok(())
599            }
600            Reader::RepeatedReader(_, _, _, ref mut reader) => reader.advance_columns(),
601            Reader::KeyValueReader(_, _, _, ref mut keys, ref mut values) => {
602                keys.advance_columns()?;
603                values.advance_columns()
604            }
605        }
606    }
607}
608
609impl fmt::Display for Reader {
610    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
611        let s = match self {
612            Reader::PrimitiveReader(..) => "PrimitiveReader",
613            Reader::OptionReader(..) => "OptionReader",
614            Reader::GroupReader(..) => "GroupReader",
615            Reader::RepeatedReader(..) => "RepeatedReader",
616            Reader::KeyValueReader(..) => "KeyValueReader",
617        };
618        write!(f, "{s}")
619    }
620}
621
622// ----------------------------------------------------------------------
623// Row iterators
624
625/// The enum Either with variants That represents a reference and a box of
626/// [`FileReader`].
627enum Either<'a> {
628    Left(&'a dyn FileReader),
629    Right(Box<dyn FileReader>),
630}
631
632impl Either<'_> {
633    fn reader(&self) -> &dyn FileReader {
634        match *self {
635            Either::Left(r) => r,
636            Either::Right(ref r) => &**r,
637        }
638    }
639}
640
641/// Access parquet data as an iterator of [`Row`]
642///
643/// # Caveats
644///
645/// Parquet stores data in a columnar fashion using [Dremel] encoding, and is therefore highly
646/// optimised for reading data by column, not row. As a consequence applications concerned with
647/// performance should prefer the columnar arrow or [ColumnReader] APIs.
648///
649/// Additionally the current implementation does not correctly handle repeated fields ([#2394]),
650/// and workloads looking to handle such schema should use the other APIs.
651///
652/// [#2394]: https://github.com/apache/arrow-rs/issues/2394
653/// [ColumnReader]: crate::file::reader::RowGroupReader::get_column_reader
654/// [Dremel]: https://research.google/pubs/pub36632/
655pub struct RowIter<'a> {
656    descr: SchemaDescPtr,
657    tree_builder: TreeBuilder,
658    file_reader: Option<Either<'a>>,
659    current_row_group: usize,
660    num_row_groups: usize,
661    row_iter: Option<ReaderIter>,
662}
663
664impl<'a> RowIter<'a> {
665    /// Creates a new iterator of [`Row`]s.
666    fn new(
667        file_reader: Option<Either<'a>>,
668        row_iter: Option<ReaderIter>,
669        descr: SchemaDescPtr,
670    ) -> Self {
671        let tree_builder = Self::tree_builder();
672        let num_row_groups = match file_reader {
673            Some(ref r) => r.reader().num_row_groups(),
674            None => 0,
675        };
676
677        Self {
678            descr,
679            file_reader,
680            tree_builder,
681            num_row_groups,
682            row_iter,
683            current_row_group: 0,
684        }
685    }
686
687    /// Creates iterator of [`Row`]s for all row groups in a
688    /// file.
689    pub fn from_file(proj: Option<Type>, reader: &'a dyn FileReader) -> Result<Self> {
690        let either = Either::Left(reader);
691        let descr =
692            Self::get_proj_descr(proj, reader.metadata().file_metadata().schema_descr_ptr())?;
693
694        Ok(Self::new(Some(either), None, descr))
695    }
696
697    /// Creates iterator of [`Row`]s for a specific row group.
698    pub fn from_row_group(proj: Option<Type>, reader: &'a dyn RowGroupReader) -> Result<Self> {
699        let descr = Self::get_proj_descr(proj, reader.metadata().schema_descr_ptr())?;
700        let tree_builder = Self::tree_builder();
701        let row_iter = tree_builder.as_iter(descr.clone(), reader)?;
702
703        // For row group we need to set `current_row_group` >= `num_row_groups`, because
704        // we only have one row group and can't buffer more.
705        Ok(Self::new(None, Some(row_iter), descr))
706    }
707
708    /// Creates a iterator of [`Row`]s from a [`FileReader`] using the full file schema.
709    pub fn from_file_into(reader: Box<dyn FileReader>) -> Self {
710        let either = Either::Right(reader);
711        let descr = either
712            .reader()
713            .metadata()
714            .file_metadata()
715            .schema_descr_ptr();
716
717        Self::new(Some(either), None, descr)
718    }
719
720    /// Tries to create a iterator of [`Row`]s using projections.
721    /// Returns a error if a file reader is not the source of this iterator.
722    ///
723    /// The Projected schema can be a subset of or equal to the file schema,
724    /// when it is None, full file schema is assumed.
725    pub fn project(self, proj: Option<Type>) -> Result<Self> {
726        match self.file_reader {
727            Some(ref either) => {
728                let schema = either
729                    .reader()
730                    .metadata()
731                    .file_metadata()
732                    .schema_descr_ptr();
733                let descr = Self::get_proj_descr(proj, schema)?;
734
735                Ok(Self::new(self.file_reader, None, descr))
736            }
737            None => Err(general_err!("File reader is required to use projections")),
738        }
739    }
740
741    /// Helper method to get schema descriptor for projected schema.
742    /// If projection is None, then full schema is returned.
743    #[inline]
744    fn get_proj_descr(proj: Option<Type>, root_descr: SchemaDescPtr) -> Result<SchemaDescPtr> {
745        match proj {
746            Some(projection) => {
747                // check if projection is part of file schema
748                let root_schema = root_descr.root_schema();
749                if !root_schema.check_contains(&projection) {
750                    return Err(general_err!("Root schema does not contain projection"));
751                }
752                Ok(Arc::new(SchemaDescriptor::new(Arc::new(projection))))
753            }
754            None => Ok(root_descr),
755        }
756    }
757
758    /// Sets batch size for this row iter.
759    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
760        self.tree_builder = self.tree_builder.with_batch_size(batch_size);
761        self
762    }
763
764    /// Returns common tree builder, so the same settings are applied to both iterators
765    /// from file reader and row group.
766    #[inline]
767    fn tree_builder() -> TreeBuilder {
768        TreeBuilder::new()
769    }
770}
771
772impl Iterator for RowIter<'_> {
773    type Item = Result<Row>;
774
775    fn next(&mut self) -> Option<Result<Row>> {
776        let mut row = None;
777        if let Some(ref mut iter) = self.row_iter {
778            row = iter.next();
779        }
780
781        while row.is_none() && self.current_row_group < self.num_row_groups {
782            // We do not expect any failures when accessing a row group, and file reader
783            // must be set for selecting next row group.
784            if let Some(ref either) = self.file_reader {
785                let file_reader = either.reader();
786                let row_group_reader = &*file_reader
787                    .get_row_group(self.current_row_group)
788                    .expect("Row group is required to advance");
789
790                match self
791                    .tree_builder
792                    .as_iter(self.descr.clone(), row_group_reader)
793                {
794                    Ok(mut iter) => {
795                        row = iter.next();
796
797                        self.current_row_group += 1;
798                        self.row_iter = Some(iter);
799                    }
800                    Err(e) => return Some(Err(e)),
801                }
802            }
803        }
804
805        row
806    }
807}
808
809/// Internal iterator of [`Row`]s for a reader.
810pub struct ReaderIter {
811    root_reader: Reader,
812    records_left: usize,
813}
814
815impl ReaderIter {
816    fn new(mut root_reader: Reader, num_records: usize) -> Result<Self> {
817        // Prepare root reader by advancing all column vectors
818        root_reader.advance_columns()?;
819        Ok(Self {
820            root_reader,
821            records_left: num_records,
822        })
823    }
824}
825
826impl Iterator for ReaderIter {
827    type Item = Result<Row>;
828
829    fn next(&mut self) -> Option<Result<Row>> {
830        if self.records_left > 0 {
831            self.records_left -= 1;
832            Some(self.root_reader.read())
833        } else {
834            None
835        }
836    }
837}
838
839#[cfg(test)]
840mod tests {
841    use super::*;
842
843    use crate::data_type::Int64Type;
844    use crate::file::reader::SerializedFileReader;
845    use crate::file::writer::SerializedFileWriter;
846    use crate::record::api::RowAccessor;
847    use crate::schema::parser::parse_message_type;
848    use crate::util::test_common::file_util::{get_test_file, get_test_path};
849    use bytes::Bytes;
850
851    // Convenient macros to assemble row, list, map, and group.
852
853    macro_rules! row {
854        ($($e:tt)*) => {
855            {
856                Row::new(vec![$($e)*])
857            }
858        }
859    }
860
861    macro_rules! list {
862        ($($e:tt)*) => {
863            {
864                Field::ListInternal(make_list(vec![$($e)*]))
865            }
866        }
867    }
868
869    macro_rules! map {
870        ($($e:tt)*) => {
871            {
872                Field::MapInternal(make_map(vec![$($e)*]))
873            }
874        }
875    }
876
877    macro_rules! group {
878        ( $( $e:expr ), * ) => {
879            {
880                Field::Group(row!($( $e ), *))
881            }
882        }
883    }
884
885    #[test]
886    fn test_file_reader_rows_nulls() {
887        let rows = test_file_reader_rows("nulls.snappy.parquet", None).unwrap();
888        let expected_rows = vec![
889            row![(
890                "b_struct".to_string(),
891                group![("b_c_int".to_string(), Field::Null)]
892            )],
893            row![(
894                "b_struct".to_string(),
895                group![("b_c_int".to_string(), Field::Null)]
896            )],
897            row![(
898                "b_struct".to_string(),
899                group![("b_c_int".to_string(), Field::Null)]
900            )],
901            row![(
902                "b_struct".to_string(),
903                group![("b_c_int".to_string(), Field::Null)]
904            )],
905            row![(
906                "b_struct".to_string(),
907                group![("b_c_int".to_string(), Field::Null)]
908            )],
909            row![(
910                "b_struct".to_string(),
911                group![("b_c_int".to_string(), Field::Null)]
912            )],
913            row![(
914                "b_struct".to_string(),
915                group![("b_c_int".to_string(), Field::Null)]
916            )],
917            row![(
918                "b_struct".to_string(),
919                group![("b_c_int".to_string(), Field::Null)]
920            )],
921        ];
922        assert_eq!(rows, expected_rows);
923    }
924
925    #[test]
926    fn test_file_reader_rows_nonnullable() {
927        let rows = test_file_reader_rows("nonnullable.impala.parquet", None).unwrap();
928        let expected_rows = vec![row![
929            ("ID".to_string(), Field::Long(8)),
930            ("Int_Array".to_string(), list![Field::Int(-1)]),
931            (
932                "int_array_array".to_string(),
933                list![list![Field::Int(-1), Field::Int(-2)], list![]]
934            ),
935            (
936                "Int_Map".to_string(),
937                map![(Field::Str("k1".to_string()), Field::Int(-1))]
938            ),
939            (
940                "int_map_array".to_string(),
941                list![
942                    map![],
943                    map![(Field::Str("k1".to_string()), Field::Int(1))],
944                    map![],
945                    map![]
946                ]
947            ),
948            (
949                "nested_Struct".to_string(),
950                group![
951                    ("a".to_string(), Field::Int(-1)),
952                    ("B".to_string(), list![Field::Int(-1)]),
953                    (
954                        "c".to_string(),
955                        group![(
956                            "D".to_string(),
957                            list![list![group![
958                                ("e".to_string(), Field::Int(-1)),
959                                ("f".to_string(), Field::Str("nonnullable".to_string()))
960                            ]]]
961                        )]
962                    ),
963                    ("G".to_string(), map![])
964                ]
965            )
966        ]];
967        assert_eq!(rows, expected_rows);
968    }
969
970    #[test]
971    fn test_file_reader_rows_nullable() {
972        let rows = test_file_reader_rows("nullable.impala.parquet", None).unwrap();
973        let expected_rows = vec![
974            row![
975                ("id".to_string(), Field::Long(1)),
976                (
977                    "int_array".to_string(),
978                    list![Field::Int(1), Field::Int(2), Field::Int(3)]
979                ),
980                (
981                    "int_array_Array".to_string(),
982                    list![
983                        list![Field::Int(1), Field::Int(2)],
984                        list![Field::Int(3), Field::Int(4)]
985                    ]
986                ),
987                (
988                    "int_map".to_string(),
989                    map![
990                        (Field::Str("k1".to_string()), Field::Int(1)),
991                        (Field::Str("k2".to_string()), Field::Int(100))
992                    ]
993                ),
994                (
995                    "int_Map_Array".to_string(),
996                    list![map![(Field::Str("k1".to_string()), Field::Int(1))]]
997                ),
998                (
999                    "nested_struct".to_string(),
1000                    group![
1001                        ("A".to_string(), Field::Int(1)),
1002                        ("b".to_string(), list![Field::Int(1)]),
1003                        (
1004                            "C".to_string(),
1005                            group![(
1006                                "d".to_string(),
1007                                list![
1008                                    list![
1009                                        group![
1010                                            ("E".to_string(), Field::Int(10)),
1011                                            ("F".to_string(), Field::Str("aaa".to_string()))
1012                                        ],
1013                                        group![
1014                                            ("E".to_string(), Field::Int(-10)),
1015                                            ("F".to_string(), Field::Str("bbb".to_string()))
1016                                        ]
1017                                    ],
1018                                    list![group![
1019                                        ("E".to_string(), Field::Int(11)),
1020                                        ("F".to_string(), Field::Str("c".to_string()))
1021                                    ]]
1022                                ]
1023                            )]
1024                        ),
1025                        (
1026                            "g".to_string(),
1027                            map![(
1028                                Field::Str("foo".to_string()),
1029                                group![(
1030                                    "H".to_string(),
1031                                    group![("i".to_string(), list![Field::Double(1.1)])]
1032                                )]
1033                            )]
1034                        )
1035                    ]
1036                )
1037            ],
1038            row![
1039                ("id".to_string(), Field::Long(2)),
1040                (
1041                    "int_array".to_string(),
1042                    list![
1043                        Field::Null,
1044                        Field::Int(1),
1045                        Field::Int(2),
1046                        Field::Null,
1047                        Field::Int(3),
1048                        Field::Null
1049                    ]
1050                ),
1051                (
1052                    "int_array_Array".to_string(),
1053                    list![
1054                        list![Field::Null, Field::Int(1), Field::Int(2), Field::Null],
1055                        list![Field::Int(3), Field::Null, Field::Int(4)],
1056                        list![],
1057                        Field::Null
1058                    ]
1059                ),
1060                (
1061                    "int_map".to_string(),
1062                    map![
1063                        (Field::Str("k1".to_string()), Field::Int(2)),
1064                        (Field::Str("k2".to_string()), Field::Null)
1065                    ]
1066                ),
1067                (
1068                    "int_Map_Array".to_string(),
1069                    list![
1070                        map![
1071                            (Field::Str("k3".to_string()), Field::Null),
1072                            (Field::Str("k1".to_string()), Field::Int(1))
1073                        ],
1074                        Field::Null,
1075                        map![]
1076                    ]
1077                ),
1078                (
1079                    "nested_struct".to_string(),
1080                    group![
1081                        ("A".to_string(), Field::Null),
1082                        ("b".to_string(), list![Field::Null]),
1083                        (
1084                            "C".to_string(),
1085                            group![(
1086                                "d".to_string(),
1087                                list![
1088                                    list![
1089                                        group![
1090                                            ("E".to_string(), Field::Null),
1091                                            ("F".to_string(), Field::Null)
1092                                        ],
1093                                        group![
1094                                            ("E".to_string(), Field::Int(10)),
1095                                            ("F".to_string(), Field::Str("aaa".to_string()))
1096                                        ],
1097                                        group![
1098                                            ("E".to_string(), Field::Null),
1099                                            ("F".to_string(), Field::Null)
1100                                        ],
1101                                        group![
1102                                            ("E".to_string(), Field::Int(-10)),
1103                                            ("F".to_string(), Field::Str("bbb".to_string()))
1104                                        ],
1105                                        group![
1106                                            ("E".to_string(), Field::Null),
1107                                            ("F".to_string(), Field::Null)
1108                                        ]
1109                                    ],
1110                                    list![
1111                                        group![
1112                                            ("E".to_string(), Field::Int(11)),
1113                                            ("F".to_string(), Field::Str("c".to_string()))
1114                                        ],
1115                                        Field::Null
1116                                    ],
1117                                    list![],
1118                                    Field::Null
1119                                ]
1120                            )]
1121                        ),
1122                        (
1123                            "g".to_string(),
1124                            map![
1125                                (
1126                                    Field::Str("g1".to_string()),
1127                                    group![(
1128                                        "H".to_string(),
1129                                        group![(
1130                                            "i".to_string(),
1131                                            list![Field::Double(2.2), Field::Null]
1132                                        )]
1133                                    )]
1134                                ),
1135                                (
1136                                    Field::Str("g2".to_string()),
1137                                    group![("H".to_string(), group![("i".to_string(), list![])])]
1138                                ),
1139                                (Field::Str("g3".to_string()), Field::Null),
1140                                (
1141                                    Field::Str("g4".to_string()),
1142                                    group![(
1143                                        "H".to_string(),
1144                                        group![("i".to_string(), Field::Null)]
1145                                    )]
1146                                ),
1147                                (
1148                                    Field::Str("g5".to_string()),
1149                                    group![("H".to_string(), Field::Null)]
1150                                )
1151                            ]
1152                        )
1153                    ]
1154                )
1155            ],
1156            row![
1157                ("id".to_string(), Field::Long(3)),
1158                ("int_array".to_string(), list![]),
1159                ("int_array_Array".to_string(), list![Field::Null]),
1160                ("int_map".to_string(), map![]),
1161                ("int_Map_Array".to_string(), list![Field::Null, Field::Null]),
1162                (
1163                    "nested_struct".to_string(),
1164                    group![
1165                        ("A".to_string(), Field::Null),
1166                        ("b".to_string(), Field::Null),
1167                        ("C".to_string(), group![("d".to_string(), list![])]),
1168                        ("g".to_string(), map![])
1169                    ]
1170                )
1171            ],
1172            row![
1173                ("id".to_string(), Field::Long(4)),
1174                ("int_array".to_string(), Field::Null),
1175                ("int_array_Array".to_string(), list![]),
1176                ("int_map".to_string(), map![]),
1177                ("int_Map_Array".to_string(), list![]),
1178                (
1179                    "nested_struct".to_string(),
1180                    group![
1181                        ("A".to_string(), Field::Null),
1182                        ("b".to_string(), Field::Null),
1183                        ("C".to_string(), group![("d".to_string(), Field::Null)]),
1184                        ("g".to_string(), Field::Null)
1185                    ]
1186                )
1187            ],
1188            row![
1189                ("id".to_string(), Field::Long(5)),
1190                ("int_array".to_string(), Field::Null),
1191                ("int_array_Array".to_string(), Field::Null),
1192                ("int_map".to_string(), map![]),
1193                ("int_Map_Array".to_string(), Field::Null),
1194                (
1195                    "nested_struct".to_string(),
1196                    group![
1197                        ("A".to_string(), Field::Null),
1198                        ("b".to_string(), Field::Null),
1199                        ("C".to_string(), Field::Null),
1200                        (
1201                            "g".to_string(),
1202                            map![(
1203                                Field::Str("foo".to_string()),
1204                                group![(
1205                                    "H".to_string(),
1206                                    group![(
1207                                        "i".to_string(),
1208                                        list![Field::Double(2.2), Field::Double(3.3)]
1209                                    )]
1210                                )]
1211                            )]
1212                        )
1213                    ]
1214                )
1215            ],
1216            row![
1217                ("id".to_string(), Field::Long(6)),
1218                ("int_array".to_string(), Field::Null),
1219                ("int_array_Array".to_string(), Field::Null),
1220                ("int_map".to_string(), Field::Null),
1221                ("int_Map_Array".to_string(), Field::Null),
1222                ("nested_struct".to_string(), Field::Null)
1223            ],
1224            row![
1225                ("id".to_string(), Field::Long(7)),
1226                ("int_array".to_string(), Field::Null),
1227                (
1228                    "int_array_Array".to_string(),
1229                    list![Field::Null, list![Field::Int(5), Field::Int(6)]]
1230                ),
1231                (
1232                    "int_map".to_string(),
1233                    map![
1234                        (Field::Str("k1".to_string()), Field::Null),
1235                        (Field::Str("k3".to_string()), Field::Null)
1236                    ]
1237                ),
1238                ("int_Map_Array".to_string(), Field::Null),
1239                (
1240                    "nested_struct".to_string(),
1241                    group![
1242                        ("A".to_string(), Field::Int(7)),
1243                        (
1244                            "b".to_string(),
1245                            list![Field::Int(2), Field::Int(3), Field::Null]
1246                        ),
1247                        (
1248                            "C".to_string(),
1249                            group![(
1250                                "d".to_string(),
1251                                list![list![], list![Field::Null], Field::Null]
1252                            )]
1253                        ),
1254                        ("g".to_string(), Field::Null)
1255                    ]
1256                )
1257            ],
1258        ];
1259        assert_eq!(rows, expected_rows);
1260    }
1261
1262    #[test]
1263    fn test_file_reader_rows_projection() {
1264        let schema = "
1265      message spark_schema {
1266        REQUIRED DOUBLE c;
1267        REQUIRED INT32 b;
1268      }
1269    ";
1270        let schema = parse_message_type(schema).unwrap();
1271        let rows = test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
1272        let expected_rows = vec![
1273            row![
1274                ("c".to_string(), Field::Double(1.0)),
1275                ("b".to_string(), Field::Int(1))
1276            ],
1277            row![
1278                ("c".to_string(), Field::Double(1.0)),
1279                ("b".to_string(), Field::Int(1))
1280            ],
1281            row![
1282                ("c".to_string(), Field::Double(1.0)),
1283                ("b".to_string(), Field::Int(1))
1284            ],
1285            row![
1286                ("c".to_string(), Field::Double(1.0)),
1287                ("b".to_string(), Field::Int(1))
1288            ],
1289            row![
1290                ("c".to_string(), Field::Double(1.0)),
1291                ("b".to_string(), Field::Int(1))
1292            ],
1293            row![
1294                ("c".to_string(), Field::Double(1.0)),
1295                ("b".to_string(), Field::Int(1))
1296            ],
1297        ];
1298        assert_eq!(rows, expected_rows);
1299    }
1300
1301    #[test]
1302    fn test_iter_columns_in_row() {
1303        let r = row![
1304            ("c".to_string(), Field::Double(1.0)),
1305            ("b".to_string(), Field::Int(1))
1306        ];
1307        let mut result = Vec::new();
1308        for (name, record) in r.get_column_iter() {
1309            result.push((name, record));
1310        }
1311        assert_eq!(
1312            vec![
1313                (&"c".to_string(), &Field::Double(1.0)),
1314                (&"b".to_string(), &Field::Int(1))
1315            ],
1316            result
1317        );
1318    }
1319
1320    #[test]
1321    fn test_into_columns_in_row() {
1322        let r = row![
1323            ("a".to_string(), Field::Str("My string".to_owned())),
1324            ("b".to_string(), Field::Int(1))
1325        ];
1326        assert_eq!(
1327            r.into_columns(),
1328            vec![
1329                ("a".to_string(), Field::Str("My string".to_owned())),
1330                ("b".to_string(), Field::Int(1)),
1331            ]
1332        );
1333    }
1334
1335    #[test]
1336    fn test_file_reader_rows_projection_map() {
1337        let schema = "
1338      message spark_schema {
1339        OPTIONAL group a (MAP) {
1340          REPEATED group key_value {
1341            REQUIRED BYTE_ARRAY key (UTF8);
1342            OPTIONAL group value (MAP) {
1343              REPEATED group key_value {
1344                REQUIRED INT32 key;
1345                REQUIRED BOOLEAN value;
1346              }
1347            }
1348          }
1349        }
1350      }
1351    ";
1352        let schema = parse_message_type(schema).unwrap();
1353        let rows = test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
1354        let expected_rows = vec![
1355            row![(
1356                "a".to_string(),
1357                map![(
1358                    Field::Str("a".to_string()),
1359                    map![
1360                        (Field::Int(1), Field::Bool(true)),
1361                        (Field::Int(2), Field::Bool(false))
1362                    ]
1363                )]
1364            )],
1365            row![(
1366                "a".to_string(),
1367                map![(
1368                    Field::Str("b".to_string()),
1369                    map![(Field::Int(1), Field::Bool(true))]
1370                )]
1371            )],
1372            row![(
1373                "a".to_string(),
1374                map![(Field::Str("c".to_string()), Field::Null)]
1375            )],
1376            row![("a".to_string(), map![(Field::Str("d".to_string()), map![])])],
1377            row![(
1378                "a".to_string(),
1379                map![(
1380                    Field::Str("e".to_string()),
1381                    map![(Field::Int(1), Field::Bool(true))]
1382                )]
1383            )],
1384            row![(
1385                "a".to_string(),
1386                map![(
1387                    Field::Str("f".to_string()),
1388                    map![
1389                        (Field::Int(3), Field::Bool(true)),
1390                        (Field::Int(4), Field::Bool(false)),
1391                        (Field::Int(5), Field::Bool(true))
1392                    ]
1393                )]
1394            )],
1395        ];
1396        assert_eq!(rows, expected_rows);
1397    }
1398
1399    #[test]
1400    fn test_file_reader_rows_projection_list() {
1401        let schema = "
1402      message spark_schema {
1403        OPTIONAL group a (LIST) {
1404          REPEATED group list {
1405            OPTIONAL group element (LIST) {
1406              REPEATED group list {
1407                OPTIONAL group element (LIST) {
1408                  REPEATED group list {
1409                    OPTIONAL BYTE_ARRAY element (UTF8);
1410                  }
1411                }
1412              }
1413            }
1414          }
1415        }
1416      }
1417    ";
1418        let schema = parse_message_type(schema).unwrap();
1419        let rows = test_file_reader_rows("nested_lists.snappy.parquet", Some(schema)).unwrap();
1420        let expected_rows = vec![
1421            row![(
1422                "a".to_string(),
1423                list![
1424                    list![
1425                        list![Field::Str("a".to_string()), Field::Str("b".to_string())],
1426                        list![Field::Str("c".to_string())]
1427                    ],
1428                    list![Field::Null, list![Field::Str("d".to_string())]]
1429                ]
1430            )],
1431            row![(
1432                "a".to_string(),
1433                list![
1434                    list![
1435                        list![Field::Str("a".to_string()), Field::Str("b".to_string())],
1436                        list![Field::Str("c".to_string()), Field::Str("d".to_string())]
1437                    ],
1438                    list![Field::Null, list![Field::Str("e".to_string())]]
1439                ]
1440            )],
1441            row![(
1442                "a".to_string(),
1443                list![
1444                    list![
1445                        list![Field::Str("a".to_string()), Field::Str("b".to_string())],
1446                        list![Field::Str("c".to_string()), Field::Str("d".to_string())],
1447                        list![Field::Str("e".to_string())]
1448                    ],
1449                    list![Field::Null, list![Field::Str("f".to_string())]]
1450                ]
1451            )],
1452        ];
1453        assert_eq!(rows, expected_rows);
1454    }
1455
1456    #[test]
1457    fn test_file_reader_rows_invalid_projection() {
1458        let schema = "
1459      message spark_schema {
1460        REQUIRED INT32 key;
1461        REQUIRED BOOLEAN value;
1462      }
1463    ";
1464        let schema = parse_message_type(schema).unwrap();
1465        let res = test_file_reader_rows("nested_maps.snappy.parquet", Some(schema));
1466        assert_eq!(
1467            res.unwrap_err().to_string(),
1468            "Parquet error: Root schema does not contain projection"
1469        );
1470    }
1471
1472    #[test]
1473    fn test_row_group_rows_invalid_projection() {
1474        let schema = "
1475      message spark_schema {
1476        REQUIRED INT32 key;
1477        REQUIRED BOOLEAN value;
1478      }
1479    ";
1480        let schema = parse_message_type(schema).unwrap();
1481        let res = test_row_group_rows("nested_maps.snappy.parquet", Some(schema));
1482        assert_eq!(
1483            res.unwrap_err().to_string(),
1484            "Parquet error: Root schema does not contain projection"
1485        );
1486    }
1487
1488    #[test]
1489    fn test_file_reader_rows_nested_map_type() {
1490        let schema = "
1491      message spark_schema {
1492        OPTIONAL group a (MAP) {
1493          REPEATED group key_value {
1494            REQUIRED BYTE_ARRAY key (UTF8);
1495            OPTIONAL group value (MAP) {
1496              REPEATED group key_value {
1497                REQUIRED INT32 key;
1498              }
1499            }
1500          }
1501        }
1502      }
1503    ";
1504        let schema = parse_message_type(schema).unwrap();
1505        test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
1506    }
1507
1508    #[test]
1509    fn test_file_reader_iter() {
1510        let path = get_test_path("alltypes_plain.parquet");
1511        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1512        let iter = RowIter::from_file_into(Box::new(reader));
1513
1514        let values: Vec<_> = iter.flat_map(|r| r.unwrap().get_int(0)).collect();
1515        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1516    }
1517
1518    #[test]
1519    fn test_file_reader_iter_projection() {
1520        let path = get_test_path("alltypes_plain.parquet");
1521        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1522        let schema = "message schema { OPTIONAL INT32 id; }";
1523        let proj = parse_message_type(schema).ok();
1524
1525        let iter = RowIter::from_file_into(Box::new(reader))
1526            .project(proj)
1527            .unwrap();
1528        let values: Vec<_> = iter.flat_map(|r| r.unwrap().get_int(0)).collect();
1529
1530        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1531    }
1532
1533    #[test]
1534    fn test_file_reader_iter_projection_err() {
1535        let schema = "
1536      message spark_schema {
1537        REQUIRED INT32 key;
1538        REQUIRED BOOLEAN value;
1539      }
1540    ";
1541        let proj = parse_message_type(schema).ok();
1542        let path = get_test_path("nested_maps.snappy.parquet");
1543        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1544        let res = RowIter::from_file_into(Box::new(reader)).project(proj);
1545
1546        assert_eq!(
1547            res.err().unwrap().to_string(),
1548            "Parquet error: Root schema does not contain projection"
1549        );
1550    }
1551
1552    #[test]
1553    fn test_tree_reader_handle_repeated_fields_with_no_annotation() {
1554        // Array field `phoneNumbers` does not contain LIST annotation.
1555        // We parse it as struct with `phone` repeated field as array.
1556        let rows = test_file_reader_rows("repeated_no_annotation.parquet", None).unwrap();
1557        let expected_rows = vec![
1558            row![
1559                ("id".to_string(), Field::Int(1)),
1560                ("phoneNumbers".to_string(), Field::Null)
1561            ],
1562            row![
1563                ("id".to_string(), Field::Int(2)),
1564                ("phoneNumbers".to_string(), Field::Null)
1565            ],
1566            row![
1567                ("id".to_string(), Field::Int(3)),
1568                (
1569                    "phoneNumbers".to_string(),
1570                    group![("phone".to_string(), list![])]
1571                )
1572            ],
1573            row![
1574                ("id".to_string(), Field::Int(4)),
1575                (
1576                    "phoneNumbers".to_string(),
1577                    group![(
1578                        "phone".to_string(),
1579                        list![group![
1580                            ("number".to_string(), Field::Long(5555555555)),
1581                            ("kind".to_string(), Field::Null)
1582                        ]]
1583                    )]
1584                )
1585            ],
1586            row![
1587                ("id".to_string(), Field::Int(5)),
1588                (
1589                    "phoneNumbers".to_string(),
1590                    group![(
1591                        "phone".to_string(),
1592                        list![group![
1593                            ("number".to_string(), Field::Long(1111111111)),
1594                            ("kind".to_string(), Field::Str("home".to_string()))
1595                        ]]
1596                    )]
1597                )
1598            ],
1599            row![
1600                ("id".to_string(), Field::Int(6)),
1601                (
1602                    "phoneNumbers".to_string(),
1603                    group![(
1604                        "phone".to_string(),
1605                        list![
1606                            group![
1607                                ("number".to_string(), Field::Long(1111111111)),
1608                                ("kind".to_string(), Field::Str("home".to_string()))
1609                            ],
1610                            group![
1611                                ("number".to_string(), Field::Long(2222222222)),
1612                                ("kind".to_string(), Field::Null)
1613                            ],
1614                            group![
1615                                ("number".to_string(), Field::Long(3333333333)),
1616                                ("kind".to_string(), Field::Str("mobile".to_string()))
1617                            ]
1618                        ]
1619                    )]
1620                )
1621            ],
1622        ];
1623
1624        assert_eq!(rows, expected_rows);
1625    }
1626
1627    #[test]
1628    fn test_tree_reader_handle_nested_repeated_fields_with_no_annotation() {
1629        // Create schema
1630        let schema = Arc::new(
1631            parse_message_type(
1632                "
1633            message schema {
1634                REPEATED group level1 {
1635                    REPEATED group level2 {
1636                        REQUIRED group level3 {
1637                            REQUIRED INT64 value3;
1638                        }
1639                    }
1640                    REQUIRED INT64 value1;
1641                }
1642            }",
1643            )
1644            .unwrap(),
1645        );
1646
1647        // Write Parquet file to buffer
1648        let mut buffer: Vec<u8> = Vec::new();
1649        let mut file_writer =
1650            SerializedFileWriter::new(&mut buffer, schema, Default::default()).unwrap();
1651        let mut row_group_writer = file_writer.next_row_group().unwrap();
1652
1653        // Write column level1.level2.level3.value3
1654        let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
1655        column_writer
1656            .typed::<Int64Type>()
1657            .write_batch(&[30, 31, 32], Some(&[2, 2, 2]), Some(&[0, 0, 0]))
1658            .unwrap();
1659        column_writer.close().unwrap();
1660
1661        // Write column level1.value1
1662        let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
1663        column_writer
1664            .typed::<Int64Type>()
1665            .write_batch(&[10, 11, 12], Some(&[1, 1, 1]), Some(&[0, 0, 0]))
1666            .unwrap();
1667        column_writer.close().unwrap();
1668
1669        // Finalize Parquet file
1670        row_group_writer.close().unwrap();
1671        file_writer.close().unwrap();
1672        assert_eq!(&buffer[0..4], b"PAR1");
1673
1674        // Read Parquet file from buffer
1675        let file_reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
1676        let rows: Vec<_> = file_reader
1677            .get_row_iter(None)
1678            .unwrap()
1679            .map(|row| row.unwrap())
1680            .collect();
1681
1682        let expected_rows = vec![
1683            row![(
1684                "level1".to_string(),
1685                list![group![
1686                    (
1687                        "level2".to_string(),
1688                        list![group![(
1689                            "level3".to_string(),
1690                            group![("value3".to_string(), Field::Long(30))]
1691                        )]]
1692                    ),
1693                    ("value1".to_string(), Field::Long(10))
1694                ]]
1695            )],
1696            row![(
1697                "level1".to_string(),
1698                list![group![
1699                    (
1700                        "level2".to_string(),
1701                        list![group![(
1702                            "level3".to_string(),
1703                            group![("value3".to_string(), Field::Long(31))]
1704                        )]]
1705                    ),
1706                    ("value1".to_string(), Field::Long(11))
1707                ]]
1708            )],
1709            row![(
1710                "level1".to_string(),
1711                list![group![
1712                    (
1713                        "level2".to_string(),
1714                        list![group![(
1715                            "level3".to_string(),
1716                            group![("value3".to_string(), Field::Long(32))]
1717                        )]]
1718                    ),
1719                    ("value1".to_string(), Field::Long(12))
1720                ]]
1721            )],
1722        ];
1723
1724        assert_eq!(rows, expected_rows);
1725    }
1726
1727    #[test]
1728    fn test_tree_reader_handle_primitive_repeated_fields_with_no_annotation() {
1729        // In this test the REPEATED fields are primitives
1730        let rows = test_file_reader_rows("repeated_primitive_no_list.parquet", None).unwrap();
1731        let expected_rows = vec![
1732            row![
1733                (
1734                    "Int32_list".to_string(),
1735                    Field::ListInternal(make_list([0, 1, 2, 3].map(Field::Int).to_vec()))
1736                ),
1737                (
1738                    "String_list".to_string(),
1739                    Field::ListInternal(make_list(
1740                        ["foo", "zero", "one", "two"]
1741                            .map(|s| Field::Str(s.to_string()))
1742                            .to_vec()
1743                    ))
1744                ),
1745                (
1746                    "group_of_lists".to_string(),
1747                    group![
1748                        (
1749                            "Int32_list_in_group".to_string(),
1750                            Field::ListInternal(make_list([0, 1, 2, 3].map(Field::Int).to_vec()))
1751                        ),
1752                        (
1753                            "String_list_in_group".to_string(),
1754                            Field::ListInternal(make_list(
1755                                ["foo", "zero", "one", "two"]
1756                                    .map(|s| Field::Str(s.to_string()))
1757                                    .to_vec()
1758                            ))
1759                        )
1760                    ]
1761                )
1762            ],
1763            row![
1764                (
1765                    "Int32_list".to_string(),
1766                    Field::ListInternal(make_list(vec![]))
1767                ),
1768                (
1769                    "String_list".to_string(),
1770                    Field::ListInternal(make_list(
1771                        ["three"].map(|s| Field::Str(s.to_string())).to_vec()
1772                    ))
1773                ),
1774                (
1775                    "group_of_lists".to_string(),
1776                    group![
1777                        (
1778                            "Int32_list_in_group".to_string(),
1779                            Field::ListInternal(make_list(vec![]))
1780                        ),
1781                        (
1782                            "String_list_in_group".to_string(),
1783                            Field::ListInternal(make_list(
1784                                ["three"].map(|s| Field::Str(s.to_string())).to_vec()
1785                            ))
1786                        )
1787                    ]
1788                )
1789            ],
1790            row![
1791                (
1792                    "Int32_list".to_string(),
1793                    Field::ListInternal(make_list(vec![Field::Int(4)]))
1794                ),
1795                (
1796                    "String_list".to_string(),
1797                    Field::ListInternal(make_list(
1798                        ["four"].map(|s| Field::Str(s.to_string())).to_vec()
1799                    ))
1800                ),
1801                (
1802                    "group_of_lists".to_string(),
1803                    group![
1804                        (
1805                            "Int32_list_in_group".to_string(),
1806                            Field::ListInternal(make_list(vec![Field::Int(4)]))
1807                        ),
1808                        (
1809                            "String_list_in_group".to_string(),
1810                            Field::ListInternal(make_list(
1811                                ["four"].map(|s| Field::Str(s.to_string())).to_vec()
1812                            ))
1813                        )
1814                    ]
1815                )
1816            ],
1817            row![
1818                (
1819                    "Int32_list".to_string(),
1820                    Field::ListInternal(make_list([5, 6, 7, 8].map(Field::Int).to_vec()))
1821                ),
1822                (
1823                    "String_list".to_string(),
1824                    Field::ListInternal(make_list(
1825                        ["five", "six", "seven", "eight"]
1826                            .map(|s| Field::Str(s.to_string()))
1827                            .to_vec()
1828                    ))
1829                ),
1830                (
1831                    "group_of_lists".to_string(),
1832                    group![
1833                        (
1834                            "Int32_list_in_group".to_string(),
1835                            Field::ListInternal(make_list([5, 6, 7, 8].map(Field::Int).to_vec()))
1836                        ),
1837                        (
1838                            "String_list_in_group".to_string(),
1839                            Field::ListInternal(make_list(
1840                                ["five", "six", "seven", "eight"]
1841                                    .map(|s| Field::Str(s.to_string()))
1842                                    .to_vec()
1843                            ))
1844                        )
1845                    ]
1846                )
1847            ],
1848        ];
1849        assert_eq!(rows, expected_rows);
1850    }
1851
1852    #[test]
1853    fn test_map_no_value() {
1854        // File schema:
1855        // message schema {
1856        //   required group my_map (MAP) {
1857        //     repeated group key_value {
1858        //       required int32 key;
1859        //       optional int32 value;
1860        //     }
1861        //   }
1862        //   required group my_map_no_v (MAP) {
1863        //     repeated group key_value {
1864        //       required int32 key;
1865        //     }
1866        //   }
1867        //   required group my_list (LIST) {
1868        //     repeated group list {
1869        //       required int32 element;
1870        //     }
1871        //   }
1872        // }
1873        let rows = test_file_reader_rows("map_no_value.parquet", None).unwrap();
1874
1875        // the my_map_no_v and my_list columns should be equivalent lists by this point
1876        for row in rows {
1877            let cols = row.into_columns();
1878            assert_eq!(cols[1].1, cols[2].1);
1879        }
1880    }
1881
1882    fn test_file_reader_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> {
1883        let file = get_test_file(file_name);
1884        let file_reader: Box<dyn FileReader> = Box::new(SerializedFileReader::new(file)?);
1885        let iter = file_reader.get_row_iter(schema)?;
1886        Ok(iter.map(|row| row.unwrap()).collect())
1887    }
1888
1889    fn test_row_group_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> {
1890        let file = get_test_file(file_name);
1891        let file_reader: Box<dyn FileReader> = Box::new(SerializedFileReader::new(file)?);
1892        // Check the first row group only, because files will contain only single row
1893        // group
1894        let row_group_reader = file_reader.get_row_group(0).unwrap();
1895        let iter = row_group_reader.get_row_iter(schema)?;
1896        Ok(iter.map(|row| row.unwrap()).collect())
1897    }
1898
1899    #[test]
1900    fn test_read_old_nested_list() {
1901        let rows = test_file_reader_rows("old_list_structure.parquet", None).unwrap();
1902        let expected_rows = vec![row![(
1903            "a".to_string(),
1904            Field::ListInternal(make_list(
1905                [
1906                    make_list([1, 2].map(Field::Int).to_vec()),
1907                    make_list([3, 4].map(Field::Int).to_vec())
1908                ]
1909                .map(Field::ListInternal)
1910                .to_vec()
1911            ))
1912        ),]];
1913        assert_eq!(rows, expected_rows);
1914    }
1915}