Skip to main content

parquet/record/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains implementation of record assembly and converting Parquet types into
19//! [`Row`]s.
20
21use std::{collections::HashMap, fmt, sync::Arc};
22
23use crate::basic::{ConvertedType, Repetition};
24use crate::errors::{ParquetError, Result};
25use crate::file::reader::{FileReader, RowGroupReader};
26use crate::record::{
27    api::{Field, Row, make_list, make_map},
28    triplet::TripletIter,
29};
30use crate::schema::types::{ColumnPath, SchemaDescPtr, SchemaDescriptor, Type, TypePtr};
31
32/// Default batch size for a reader
33const DEFAULT_BATCH_SIZE: usize = 1024;
34
35/// Tree builder for `Reader` enum.
36/// Serves as a container of options for building a reader tree and a builder, and
37/// accessing a records iterator [`RowIter`].
38pub struct TreeBuilder {
39    // Batch size (>= 1) for triplet iterators
40    batch_size: usize,
41}
42
43impl Default for TreeBuilder {
44    fn default() -> Self {
45        Self::new()
46    }
47}
48
49impl TreeBuilder {
50    /// Creates new tree builder with default parameters.
51    pub fn new() -> Self {
52        Self {
53            batch_size: DEFAULT_BATCH_SIZE,
54        }
55    }
56
57    /// Sets batch size for this tree builder.
58    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
59        self.batch_size = batch_size;
60        self
61    }
62
63    /// Creates new root reader for provided schema and row group.
64    pub fn build(
65        &self,
66        descr: SchemaDescPtr,
67        row_group_reader: &dyn RowGroupReader,
68    ) -> Result<Reader> {
69        // Prepare lookup table of column path -> original column index
70        // This allows to prune columns and map schema leaf nodes to the column readers
71        let mut paths: HashMap<ColumnPath, usize> = HashMap::new();
72        let row_group_metadata = row_group_reader.metadata();
73
74        for col_index in 0..row_group_reader.num_columns() {
75            let col_meta = row_group_metadata.column(col_index);
76            let col_path = col_meta.column_path().clone();
77            paths.insert(col_path, col_index);
78        }
79
80        // Build child readers for the message type
81        let mut readers = Vec::new();
82        let mut path = Vec::new();
83
84        for field in descr.root_schema().get_fields() {
85            let reader =
86                self.reader_tree(field.clone(), &mut path, 0, 0, &paths, row_group_reader)?;
87            readers.push(reader);
88        }
89
90        // Return group reader for message type,
91        // it is always required with definition level 0
92        Ok(Reader::GroupReader(None, 0, readers))
93    }
94
95    /// Creates iterator of `Row`s directly from schema descriptor and row group.
96    pub fn as_iter(
97        &self,
98        descr: SchemaDescPtr,
99        row_group_reader: &dyn RowGroupReader,
100    ) -> Result<ReaderIter> {
101        let num_records = row_group_reader.metadata().num_rows() as usize;
102        ReaderIter::new(self.build(descr, row_group_reader)?, num_records)
103    }
104
105    /// Builds tree of readers for the current schema recursively.
106    fn reader_tree(
107        &self,
108        field: TypePtr,
109        path: &mut Vec<String>,
110        mut curr_def_level: i16,
111        mut curr_rep_level: i16,
112        paths: &HashMap<ColumnPath, usize>,
113        row_group_reader: &dyn RowGroupReader,
114    ) -> Result<Reader> {
115        assert!(field.get_basic_info().has_repetition());
116        // Update current definition and repetition levels for this type
117        let repetition = field.get_basic_info().repetition();
118        match repetition {
119            Repetition::OPTIONAL => {
120                curr_def_level += 1;
121            }
122            Repetition::REPEATED => {
123                curr_def_level += 1;
124                curr_rep_level += 1;
125            }
126            _ => {}
127        }
128
129        path.push(String::from(field.name()));
130        let reader = if field.is_primitive() {
131            let col_path = ColumnPath::new(path.to_vec());
132            let orig_index = *paths
133                .get(&col_path)
134                .ok_or(general_err!("Path {:?} not found", col_path))?;
135            let col_descr = row_group_reader
136                .metadata()
137                .column(orig_index)
138                .column_descr_ptr();
139            let col_reader = row_group_reader.get_column_reader(orig_index)?;
140            let column = TripletIter::new(col_descr, col_reader, self.batch_size);
141            let reader = Reader::PrimitiveReader(field.clone(), Box::new(column));
142            if repetition == Repetition::REPEATED {
143                Reader::RepeatedReader(
144                    field,
145                    curr_def_level - 1,
146                    curr_rep_level - 1,
147                    Box::new(reader),
148                )
149            } else {
150                reader
151            }
152        } else {
153            match field.get_basic_info().converted_type() {
154                // List types
155                ConvertedType::LIST => {
156                    assert_eq!(field.get_fields().len(), 1, "Invalid list type {field:?}");
157
158                    let repeated_field = field.get_fields()[0].clone();
159                    assert_eq!(
160                        repeated_field.get_basic_info().repetition(),
161                        Repetition::REPEATED,
162                        "Invalid list type {field:?}"
163                    );
164
165                    if Reader::is_element_type(&repeated_field) {
166                        // Support for backward compatible lists
167                        let reader = self.reader_tree(
168                            repeated_field,
169                            path,
170                            curr_def_level,
171                            curr_rep_level,
172                            paths,
173                            row_group_reader,
174                        )?;
175
176                        Reader::RepeatedReader(
177                            field,
178                            curr_def_level,
179                            curr_rep_level,
180                            Box::new(reader),
181                        )
182                    } else {
183                        let child_field = repeated_field.get_fields()[0].clone();
184
185                        path.push(String::from(repeated_field.name()));
186
187                        let reader = self.reader_tree(
188                            child_field,
189                            path,
190                            curr_def_level + 1,
191                            curr_rep_level + 1,
192                            paths,
193                            row_group_reader,
194                        )?;
195
196                        path.pop();
197
198                        Reader::RepeatedReader(
199                            field,
200                            curr_def_level,
201                            curr_rep_level,
202                            Box::new(reader),
203                        )
204                    }
205                }
206                // Map types (key-value pairs)
207                ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => {
208                    assert_eq!(field.get_fields().len(), 1, "Invalid map type: {field:?}");
209                    assert!(
210                        !field.get_fields()[0].is_primitive(),
211                        "Invalid map type: {field:?}"
212                    );
213
214                    let key_value_type = field.get_fields()[0].clone();
215                    assert_eq!(
216                        key_value_type.get_basic_info().repetition(),
217                        Repetition::REPEATED,
218                        "Invalid map type: {field:?}"
219                    );
220                    // Parquet spec allows no value. In that case treat as a list. #1642
221                    if key_value_type.get_fields().len() != 1 {
222                        // If not a list, then there can only be 2 fields in the struct
223                        assert_eq!(
224                            key_value_type.get_fields().len(),
225                            2,
226                            "Invalid map type: {field:?}"
227                        );
228                    }
229
230                    path.push(String::from(key_value_type.name()));
231
232                    let key_type = &key_value_type.get_fields()[0];
233                    assert!(
234                        key_type.is_primitive(),
235                        "Map key type is expected to be a primitive type, but found {key_type:?}"
236                    );
237                    let key_reader = self.reader_tree(
238                        key_type.clone(),
239                        path,
240                        curr_def_level + 1,
241                        curr_rep_level + 1,
242                        paths,
243                        row_group_reader,
244                    )?;
245
246                    if key_value_type.get_fields().len() == 1 {
247                        path.pop();
248                        Reader::RepeatedReader(
249                            field,
250                            curr_def_level,
251                            curr_rep_level,
252                            Box::new(key_reader),
253                        )
254                    } else {
255                        let value_type = &key_value_type.get_fields()[1];
256                        let value_reader = self.reader_tree(
257                            value_type.clone(),
258                            path,
259                            curr_def_level + 1,
260                            curr_rep_level + 1,
261                            paths,
262                            row_group_reader,
263                        )?;
264
265                        path.pop();
266
267                        Reader::KeyValueReader(
268                            field,
269                            curr_def_level,
270                            curr_rep_level,
271                            Box::new(key_reader),
272                            Box::new(value_reader),
273                        )
274                    }
275                }
276                // A repeated field that is neither contained by a `LIST`- or
277                // `MAP`-annotated group nor annotated by `LIST` or `MAP`
278                // should be interpreted as a required list of required
279                // elements where the element type is the type of the field.
280                _ if repetition == Repetition::REPEATED => {
281                    let required_field = Type::group_type_builder(field.name())
282                        .with_repetition(Repetition::REQUIRED)
283                        .with_converted_type(field.get_basic_info().converted_type())
284                        .with_fields(field.get_fields().to_vec())
285                        .build()?;
286
287                    path.pop();
288
289                    let reader = self.reader_tree(
290                        Arc::new(required_field),
291                        path,
292                        curr_def_level,
293                        curr_rep_level,
294                        paths,
295                        row_group_reader,
296                    )?;
297
298                    return Ok(Reader::RepeatedReader(
299                        field,
300                        curr_def_level - 1,
301                        curr_rep_level - 1,
302                        Box::new(reader),
303                    ));
304                }
305                // Group types (structs)
306                _ => {
307                    let mut readers = Vec::new();
308                    for child in field.get_fields() {
309                        let reader = self.reader_tree(
310                            child.clone(),
311                            path,
312                            curr_def_level,
313                            curr_rep_level,
314                            paths,
315                            row_group_reader,
316                        )?;
317                        readers.push(reader);
318                    }
319                    Reader::GroupReader(Some(field), curr_def_level, readers)
320                }
321            }
322        };
323        path.pop();
324
325        Ok(Reader::option(repetition, curr_def_level, reader))
326    }
327}
328
329/// Reader tree for record assembly
330pub enum Reader {
331    /// Primitive reader with type information and triplet iterator
332    PrimitiveReader(TypePtr, Box<TripletIter>),
333    /// Optional reader with definition level of a parent and a reader
334    OptionReader(i16, Box<Reader>),
335    /// Group (struct) reader with type information, definition level and list of child
336    /// readers. When it represents message type, type information is None
337    GroupReader(Option<TypePtr>, i16, Vec<Reader>),
338    /// Reader for repeated values, e.g. lists, contains type information, definition
339    /// level, repetition level and a child reader
340    RepeatedReader(TypePtr, i16, i16, Box<Reader>),
341    /// Reader of key-value pairs, e.g. maps, contains type information, definition
342    /// level, repetition level, child reader for keys and child reader for values
343    KeyValueReader(TypePtr, i16, i16, Box<Reader>, Box<Reader>),
344}
345
346impl Reader {
347    /// Wraps reader in option reader based on repetition.
348    fn option(repetition: Repetition, def_level: i16, reader: Reader) -> Self {
349        if repetition == Repetition::OPTIONAL {
350            Reader::OptionReader(def_level - 1, Box::new(reader))
351        } else {
352            reader
353        }
354    }
355
356    /// Returns true if repeated type is an element type for the list.
357    /// Used to determine legacy list types.
358    /// This method is copied from Spark Parquet reader and is based on the reference:
359    /// <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md>
360    ///   #backward-compatibility-rules
361    fn is_element_type(repeated_type: &Type) -> bool {
362        // For legacy 2-level list types whose element type is a 2-level list
363        //
364        //    // ARRAY<ARRAY<INT>> (nullable list, non-null elements)
365        //    optional group my_list (LIST) {
366        //      repeated group array (LIST) {
367        //        repeated int32 array;
368        //      };
369        //    }
370        //
371        if repeated_type.is_list() || repeated_type.has_single_repeated_child() {
372            return false;
373        }
374
375        // For legacy 2-level list types with primitive element type, e.g.:
376        //
377        //    // ARRAY<INT> (nullable list, non-null elements)
378        //    optional group my_list (LIST) {
379        //      repeated int32 element;
380        //    }
381        //
382        repeated_type.is_primitive() ||
383    // For legacy 2-level list types whose element type is a group type with 2 or more
384    // fields, e.g.:
385    //
386    //    // ARRAY<STRUCT<str: STRING, num: INT>> (nullable list, non-null elements)
387    //    optional group my_list (LIST) {
388    //      repeated group element {
389    //        required binary str (UTF8);
390    //        required int32 num;
391    //      };
392    //    }
393    //
394    repeated_type.is_group() && repeated_type.get_fields().len() > 1 ||
395    // For legacy 2-level list types generated by parquet-avro (Parquet version < 1.6.0),
396    // e.g.:
397    //
398    //    // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements)
399    //    optional group my_list (LIST) {
400    //      repeated group array {
401    //        required binary str (UTF8);
402    //      };
403    //    }
404    //
405    repeated_type.name() == "array" ||
406    // For Parquet data generated by parquet-thrift, e.g.:
407    //
408    //    // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements)
409    //    optional group my_list (LIST) {
410    //      repeated group my_list_tuple {
411    //        required binary str (UTF8);
412    //      };
413    //    }
414    //
415    repeated_type.name().ends_with("_tuple")
416    }
417
418    /// Reads current record as `Row` from the reader tree.
419    /// Automatically advances all necessary readers.
420    /// This must be called on the root level reader (i.e., for Message type).
421    /// Otherwise, it will panic.
422    fn read(&mut self) -> Result<Row> {
423        match *self {
424            Reader::GroupReader(_, _, ref mut readers) => {
425                let mut fields = Vec::new();
426                for reader in readers {
427                    fields.push((String::from(reader.field_name()), reader.read_field()?));
428                }
429                Ok(Row::new(fields))
430            }
431            _ => panic!("Cannot call read() on {self}"),
432        }
433    }
434
435    /// Reads current record as `Field` from the reader tree.
436    /// Automatically advances all necessary readers.
437    fn read_field(&mut self) -> Result<Field> {
438        let field = match *self {
439            Reader::PrimitiveReader(_, ref mut column) => {
440                if !column.has_next() {
441                    return Err(general_err!("Unexpected end of column data"));
442                }
443                let value = column.current_value()?;
444                column.read_next()?;
445                value
446            }
447            Reader::OptionReader(def_level, ref mut reader) => {
448                if !reader.has_next() {
449                    return Err(general_err!("Unexpected end of column data"));
450                }
451                if reader.current_def_level() > def_level {
452                    reader.read_field()?
453                } else {
454                    reader.advance_columns()?;
455                    Field::Null
456                }
457            }
458            Reader::GroupReader(_, def_level, ref mut readers) => {
459                let mut fields = Vec::new();
460                for reader in readers {
461                    if reader.repetition() != Repetition::OPTIONAL
462                        || reader.current_def_level() > def_level
463                    {
464                        fields.push((String::from(reader.field_name()), reader.read_field()?));
465                    } else {
466                        reader.advance_columns()?;
467                        fields.push((String::from(reader.field_name()), Field::Null));
468                    }
469                }
470                let row = Row::new(fields);
471                Field::Group(row)
472            }
473            Reader::RepeatedReader(_, def_level, rep_level, ref mut reader) => {
474                if !reader.has_next() {
475                    return Err(general_err!("Unexpected end of column data"));
476                }
477                let mut elements = Vec::new();
478                loop {
479                    if reader.current_def_level() > def_level {
480                        elements.push(reader.read_field()?);
481                    } else {
482                        reader.advance_columns()?;
483                        // If the current definition level is equal to the definition
484                        // level of this repeated type, then the
485                        // result is an empty list and the repetition level
486                        // will always be <= rl.
487                        break;
488                    }
489
490                    // This covers case when we are out of repetition levels and should
491                    // close the group, or there are no values left to
492                    // buffer.
493                    if !reader.has_next() || reader.current_rep_level() <= rep_level {
494                        break;
495                    }
496                }
497                Field::ListInternal(make_list(elements))
498            }
499            Reader::KeyValueReader(_, def_level, rep_level, ref mut keys, ref mut values) => {
500                if !keys.has_next() {
501                    return Err(general_err!("Unexpected end of column data"));
502                }
503                let mut pairs = Vec::new();
504                loop {
505                    if keys.current_def_level() > def_level {
506                        pairs.push((keys.read_field()?, values.read_field()?));
507                    } else {
508                        keys.advance_columns()?;
509                        values.advance_columns()?;
510                        // If the current definition level is equal to the definition
511                        // level of this repeated type, then the
512                        // result is an empty list and the repetition level
513                        // will always be <= rl.
514                        break;
515                    }
516
517                    // This covers case when we are out of repetition levels and should
518                    // close the group, or there are no values left to
519                    // buffer.
520                    if !keys.has_next() || keys.current_rep_level() <= rep_level {
521                        break;
522                    }
523                }
524
525                Field::MapInternal(make_map(pairs))
526            }
527        };
528        Ok(field)
529    }
530
531    /// Returns field name for the current reader.
532    fn field_name(&self) -> &str {
533        match *self {
534            Reader::PrimitiveReader(ref field, _) => field.name(),
535            Reader::OptionReader(_, ref reader) => reader.field_name(),
536            Reader::GroupReader(ref opt, ..) => match opt {
537                Some(field) => field.name(),
538                None => panic!("Field is None for group reader"),
539            },
540            Reader::RepeatedReader(ref field, ..) => field.name(),
541            Reader::KeyValueReader(ref field, ..) => field.name(),
542        }
543    }
544
545    /// Returns repetition for the current reader.
546    fn repetition(&self) -> Repetition {
547        match *self {
548            Reader::PrimitiveReader(ref field, _) => field.get_basic_info().repetition(),
549            Reader::OptionReader(_, ref reader) => reader.repetition(),
550            Reader::GroupReader(ref opt, ..) => match opt {
551                Some(field) => field.get_basic_info().repetition(),
552                None => panic!("Field is None for group reader"),
553            },
554            Reader::RepeatedReader(ref field, ..) => field.get_basic_info().repetition(),
555            Reader::KeyValueReader(ref field, ..) => field.get_basic_info().repetition(),
556        }
557    }
558
559    /// Returns true, if current reader has more values, false otherwise.
560    /// Method does not advance internal iterator.
561    fn has_next(&self) -> bool {
562        match *self {
563            Reader::PrimitiveReader(_, ref column) => column.has_next(),
564            Reader::OptionReader(_, ref reader) => reader.has_next(),
565            Reader::GroupReader(_, _, ref readers) => readers.first().unwrap().has_next(),
566            Reader::RepeatedReader(_, _, _, ref reader) => reader.has_next(),
567            Reader::KeyValueReader(_, _, _, ref keys, _) => keys.has_next(),
568        }
569    }
570
571    /// Returns current definition level,
572    /// Method does not advance internal iterator.
573    fn current_def_level(&self) -> i16 {
574        match *self {
575            Reader::PrimitiveReader(_, ref column) => column.current_def_level(),
576            Reader::OptionReader(_, ref reader) => reader.current_def_level(),
577            Reader::GroupReader(_, _, ref readers) => match readers.first() {
578                Some(reader) => reader.current_def_level(),
579                None => panic!("Current definition level: empty group reader"),
580            },
581            Reader::RepeatedReader(_, _, _, ref reader) => reader.current_def_level(),
582            Reader::KeyValueReader(_, _, _, ref keys, _) => keys.current_def_level(),
583        }
584    }
585
586    /// Returns current repetition level.
587    /// Method does not advance internal iterator.
588    fn current_rep_level(&self) -> i16 {
589        match *self {
590            Reader::PrimitiveReader(_, ref column) => column.current_rep_level(),
591            Reader::OptionReader(_, ref reader) => reader.current_rep_level(),
592            Reader::GroupReader(_, _, ref readers) => match readers.first() {
593                Some(reader) => reader.current_rep_level(),
594                None => panic!("Current repetition level: empty group reader"),
595            },
596            Reader::RepeatedReader(_, _, _, ref reader) => reader.current_rep_level(),
597            Reader::KeyValueReader(_, _, _, ref keys, _) => keys.current_rep_level(),
598        }
599    }
600
601    /// Advances leaf columns for the current reader.
602    fn advance_columns(&mut self) -> Result<()> {
603        match *self {
604            Reader::PrimitiveReader(_, ref mut column) => column.read_next().map(|_| ()),
605            Reader::OptionReader(_, ref mut reader) => reader.advance_columns(),
606            Reader::GroupReader(_, _, ref mut readers) => {
607                for reader in readers {
608                    reader.advance_columns()?;
609                }
610                Ok(())
611            }
612            Reader::RepeatedReader(_, _, _, ref mut reader) => reader.advance_columns(),
613            Reader::KeyValueReader(_, _, _, ref mut keys, ref mut values) => {
614                keys.advance_columns()?;
615                values.advance_columns()
616            }
617        }
618    }
619}
620
621impl fmt::Display for Reader {
622    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
623        let s = match self {
624            Reader::PrimitiveReader(..) => "PrimitiveReader",
625            Reader::OptionReader(..) => "OptionReader",
626            Reader::GroupReader(..) => "GroupReader",
627            Reader::RepeatedReader(..) => "RepeatedReader",
628            Reader::KeyValueReader(..) => "KeyValueReader",
629        };
630        write!(f, "{s}")
631    }
632}
633
634// ----------------------------------------------------------------------
635// Row iterators
636
637/// The enum Either with variants That represents a reference and a box of
638/// [`FileReader`].
639enum Either<'a> {
640    Left(&'a dyn FileReader),
641    Right(Box<dyn FileReader>),
642}
643
644impl Either<'_> {
645    fn reader(&self) -> &dyn FileReader {
646        match *self {
647            Either::Left(r) => r,
648            Either::Right(ref r) => &**r,
649        }
650    }
651}
652
653/// Access parquet data as an iterator of [`Row`]
654///
655/// # Caveats
656///
657/// Parquet stores data in a columnar fashion using [Dremel] encoding, and is therefore highly
658/// optimised for reading data by column, not row. As a consequence applications concerned with
659/// performance should prefer the columnar arrow or [ColumnReader] APIs.
660///
661/// Additionally the current implementation does not correctly handle repeated fields ([#2394]),
662/// and workloads looking to handle such schema should use the other APIs.
663///
664/// [#2394]: https://github.com/apache/arrow-rs/issues/2394
665/// [ColumnReader]: crate::file::reader::RowGroupReader::get_column_reader
666/// [Dremel]: https://research.google/pubs/pub36632/
667pub struct RowIter<'a> {
668    descr: SchemaDescPtr,
669    tree_builder: TreeBuilder,
670    file_reader: Option<Either<'a>>,
671    current_row_group: usize,
672    num_row_groups: usize,
673    row_iter: Option<ReaderIter>,
674}
675
676impl<'a> RowIter<'a> {
677    /// Creates a new iterator of [`Row`]s.
678    fn new(
679        file_reader: Option<Either<'a>>,
680        row_iter: Option<ReaderIter>,
681        descr: SchemaDescPtr,
682    ) -> Self {
683        let tree_builder = Self::tree_builder();
684        let num_row_groups = match file_reader {
685            Some(ref r) => r.reader().num_row_groups(),
686            None => 0,
687        };
688
689        Self {
690            descr,
691            file_reader,
692            tree_builder,
693            num_row_groups,
694            row_iter,
695            current_row_group: 0,
696        }
697    }
698
699    /// Creates iterator of [`Row`]s for all row groups in a
700    /// file.
701    pub fn from_file(proj: Option<Type>, reader: &'a dyn FileReader) -> Result<Self> {
702        let either = Either::Left(reader);
703        let descr =
704            Self::get_proj_descr(proj, reader.metadata().file_metadata().schema_descr_ptr())?;
705
706        Ok(Self::new(Some(either), None, descr))
707    }
708
709    /// Creates iterator of [`Row`]s for a specific row group.
710    pub fn from_row_group(proj: Option<Type>, reader: &'a dyn RowGroupReader) -> Result<Self> {
711        let descr = Self::get_proj_descr(proj, reader.metadata().schema_descr_ptr())?;
712        let tree_builder = Self::tree_builder();
713        let row_iter = tree_builder.as_iter(descr.clone(), reader)?;
714
715        // For row group we need to set `current_row_group` >= `num_row_groups`, because
716        // we only have one row group and can't buffer more.
717        Ok(Self::new(None, Some(row_iter), descr))
718    }
719
720    /// Creates a iterator of [`Row`]s from a [`FileReader`] using the full file schema.
721    pub fn from_file_into(reader: Box<dyn FileReader>) -> Self {
722        let either = Either::Right(reader);
723        let descr = either
724            .reader()
725            .metadata()
726            .file_metadata()
727            .schema_descr_ptr();
728
729        Self::new(Some(either), None, descr)
730    }
731
732    /// Tries to create a iterator of [`Row`]s using projections.
733    /// Returns a error if a file reader is not the source of this iterator.
734    ///
735    /// The Projected schema can be a subset of or equal to the file schema,
736    /// when it is None, full file schema is assumed.
737    pub fn project(self, proj: Option<Type>) -> Result<Self> {
738        match self.file_reader {
739            Some(ref either) => {
740                let schema = either
741                    .reader()
742                    .metadata()
743                    .file_metadata()
744                    .schema_descr_ptr();
745                let descr = Self::get_proj_descr(proj, schema)?;
746
747                Ok(Self::new(self.file_reader, None, descr))
748            }
749            None => Err(general_err!("File reader is required to use projections")),
750        }
751    }
752
753    /// Helper method to get schema descriptor for projected schema.
754    /// If projection is None, then full schema is returned.
755    #[inline]
756    fn get_proj_descr(proj: Option<Type>, root_descr: SchemaDescPtr) -> Result<SchemaDescPtr> {
757        match proj {
758            Some(projection) => {
759                // check if projection is part of file schema
760                let root_schema = root_descr.root_schema();
761                if !root_schema.check_contains(&projection) {
762                    return Err(general_err!("Root schema does not contain projection"));
763                }
764                Ok(Arc::new(SchemaDescriptor::new(Arc::new(projection))))
765            }
766            None => Ok(root_descr),
767        }
768    }
769
770    /// Sets batch size for this row iter.
771    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
772        self.tree_builder = self.tree_builder.with_batch_size(batch_size);
773        self
774    }
775
776    /// Returns common tree builder, so the same settings are applied to both iterators
777    /// from file reader and row group.
778    #[inline]
779    fn tree_builder() -> TreeBuilder {
780        TreeBuilder::new()
781    }
782}
783
784impl Iterator for RowIter<'_> {
785    type Item = Result<Row>;
786
787    fn next(&mut self) -> Option<Result<Row>> {
788        let mut row = None;
789        if let Some(ref mut iter) = self.row_iter {
790            row = iter.next();
791        }
792
793        while row.is_none() && self.current_row_group < self.num_row_groups {
794            // We do not expect any failures when accessing a row group, and file reader
795            // must be set for selecting next row group.
796            if let Some(ref either) = self.file_reader {
797                let file_reader = either.reader();
798                let row_group_reader = &*file_reader
799                    .get_row_group(self.current_row_group)
800                    .expect("Row group is required to advance");
801
802                match self
803                    .tree_builder
804                    .as_iter(self.descr.clone(), row_group_reader)
805                {
806                    Ok(mut iter) => {
807                        row = iter.next();
808
809                        self.current_row_group += 1;
810                        self.row_iter = Some(iter);
811                    }
812                    Err(e) => return Some(Err(e)),
813                }
814            }
815        }
816
817        row
818    }
819}
820
821/// Internal iterator of [`Row`]s for a reader.
822pub struct ReaderIter {
823    root_reader: Reader,
824    records_left: usize,
825}
826
827impl ReaderIter {
828    fn new(mut root_reader: Reader, num_records: usize) -> Result<Self> {
829        // Prepare root reader by advancing all column vectors
830        root_reader.advance_columns()?;
831        Ok(Self {
832            root_reader,
833            records_left: num_records,
834        })
835    }
836}
837
838impl Iterator for ReaderIter {
839    type Item = Result<Row>;
840
841    fn next(&mut self) -> Option<Result<Row>> {
842        if self.records_left > 0 {
843            self.records_left -= 1;
844            Some(self.root_reader.read())
845        } else {
846            None
847        }
848    }
849}
850
851#[cfg(test)]
852mod tests {
853    use super::*;
854
855    use crate::data_type::Int64Type;
856    use crate::file::reader::SerializedFileReader;
857    use crate::file::writer::SerializedFileWriter;
858    use crate::record::api::RowAccessor;
859    use crate::schema::parser::parse_message_type;
860    use crate::util::test_common::file_util::{get_test_file, get_test_path};
861    use bytes::Bytes;
862
863    // Convenient macros to assemble row, list, map, and group.
864
865    macro_rules! row {
866        ($($e:tt)*) => {
867            {
868                Row::new(vec![$($e)*])
869            }
870        }
871    }
872
873    macro_rules! list {
874        ($($e:tt)*) => {
875            {
876                Field::ListInternal(make_list(vec![$($e)*]))
877            }
878        }
879    }
880
881    macro_rules! map {
882        ($($e:tt)*) => {
883            {
884                Field::MapInternal(make_map(vec![$($e)*]))
885            }
886        }
887    }
888
889    macro_rules! group {
890        ( $( $e:expr ), * ) => {
891            {
892                Field::Group(row!($( $e ), *))
893            }
894        }
895    }
896
897    #[test]
898    fn test_file_reader_rows_nulls() {
899        let rows = test_file_reader_rows("nulls.snappy.parquet", None).unwrap();
900        let expected_rows = vec![
901            row![(
902                "b_struct".to_string(),
903                group![("b_c_int".to_string(), Field::Null)]
904            )],
905            row![(
906                "b_struct".to_string(),
907                group![("b_c_int".to_string(), Field::Null)]
908            )],
909            row![(
910                "b_struct".to_string(),
911                group![("b_c_int".to_string(), Field::Null)]
912            )],
913            row![(
914                "b_struct".to_string(),
915                group![("b_c_int".to_string(), Field::Null)]
916            )],
917            row![(
918                "b_struct".to_string(),
919                group![("b_c_int".to_string(), Field::Null)]
920            )],
921            row![(
922                "b_struct".to_string(),
923                group![("b_c_int".to_string(), Field::Null)]
924            )],
925            row![(
926                "b_struct".to_string(),
927                group![("b_c_int".to_string(), Field::Null)]
928            )],
929            row![(
930                "b_struct".to_string(),
931                group![("b_c_int".to_string(), Field::Null)]
932            )],
933        ];
934        assert_eq!(rows, expected_rows);
935    }
936
937    #[test]
938    fn test_file_reader_rows_nonnullable() {
939        let rows = test_file_reader_rows("nonnullable.impala.parquet", None).unwrap();
940        let expected_rows = vec![row![
941            ("ID".to_string(), Field::Long(8)),
942            ("Int_Array".to_string(), list![Field::Int(-1)]),
943            (
944                "int_array_array".to_string(),
945                list![list![Field::Int(-1), Field::Int(-2)], list![]]
946            ),
947            (
948                "Int_Map".to_string(),
949                map![(Field::Str("k1".to_string()), Field::Int(-1))]
950            ),
951            (
952                "int_map_array".to_string(),
953                list![
954                    map![],
955                    map![(Field::Str("k1".to_string()), Field::Int(1))],
956                    map![],
957                    map![]
958                ]
959            ),
960            (
961                "nested_Struct".to_string(),
962                group![
963                    ("a".to_string(), Field::Int(-1)),
964                    ("B".to_string(), list![Field::Int(-1)]),
965                    (
966                        "c".to_string(),
967                        group![(
968                            "D".to_string(),
969                            list![list![group![
970                                ("e".to_string(), Field::Int(-1)),
971                                ("f".to_string(), Field::Str("nonnullable".to_string()))
972                            ]]]
973                        )]
974                    ),
975                    ("G".to_string(), map![])
976                ]
977            )
978        ]];
979        assert_eq!(rows, expected_rows);
980    }
981
982    #[test]
983    fn test_file_reader_rows_nullable() {
984        let rows = test_file_reader_rows("nullable.impala.parquet", None).unwrap();
985        let expected_rows = vec![
986            row![
987                ("id".to_string(), Field::Long(1)),
988                (
989                    "int_array".to_string(),
990                    list![Field::Int(1), Field::Int(2), Field::Int(3)]
991                ),
992                (
993                    "int_array_Array".to_string(),
994                    list![
995                        list![Field::Int(1), Field::Int(2)],
996                        list![Field::Int(3), Field::Int(4)]
997                    ]
998                ),
999                (
1000                    "int_map".to_string(),
1001                    map![
1002                        (Field::Str("k1".to_string()), Field::Int(1)),
1003                        (Field::Str("k2".to_string()), Field::Int(100))
1004                    ]
1005                ),
1006                (
1007                    "int_Map_Array".to_string(),
1008                    list![map![(Field::Str("k1".to_string()), Field::Int(1))]]
1009                ),
1010                (
1011                    "nested_struct".to_string(),
1012                    group![
1013                        ("A".to_string(), Field::Int(1)),
1014                        ("b".to_string(), list![Field::Int(1)]),
1015                        (
1016                            "C".to_string(),
1017                            group![(
1018                                "d".to_string(),
1019                                list![
1020                                    list![
1021                                        group![
1022                                            ("E".to_string(), Field::Int(10)),
1023                                            ("F".to_string(), Field::Str("aaa".to_string()))
1024                                        ],
1025                                        group![
1026                                            ("E".to_string(), Field::Int(-10)),
1027                                            ("F".to_string(), Field::Str("bbb".to_string()))
1028                                        ]
1029                                    ],
1030                                    list![group![
1031                                        ("E".to_string(), Field::Int(11)),
1032                                        ("F".to_string(), Field::Str("c".to_string()))
1033                                    ]]
1034                                ]
1035                            )]
1036                        ),
1037                        (
1038                            "g".to_string(),
1039                            map![(
1040                                Field::Str("foo".to_string()),
1041                                group![(
1042                                    "H".to_string(),
1043                                    group![("i".to_string(), list![Field::Double(1.1)])]
1044                                )]
1045                            )]
1046                        )
1047                    ]
1048                )
1049            ],
1050            row![
1051                ("id".to_string(), Field::Long(2)),
1052                (
1053                    "int_array".to_string(),
1054                    list![
1055                        Field::Null,
1056                        Field::Int(1),
1057                        Field::Int(2),
1058                        Field::Null,
1059                        Field::Int(3),
1060                        Field::Null
1061                    ]
1062                ),
1063                (
1064                    "int_array_Array".to_string(),
1065                    list![
1066                        list![Field::Null, Field::Int(1), Field::Int(2), Field::Null],
1067                        list![Field::Int(3), Field::Null, Field::Int(4)],
1068                        list![],
1069                        Field::Null
1070                    ]
1071                ),
1072                (
1073                    "int_map".to_string(),
1074                    map![
1075                        (Field::Str("k1".to_string()), Field::Int(2)),
1076                        (Field::Str("k2".to_string()), Field::Null)
1077                    ]
1078                ),
1079                (
1080                    "int_Map_Array".to_string(),
1081                    list![
1082                        map![
1083                            (Field::Str("k3".to_string()), Field::Null),
1084                            (Field::Str("k1".to_string()), Field::Int(1))
1085                        ],
1086                        Field::Null,
1087                        map![]
1088                    ]
1089                ),
1090                (
1091                    "nested_struct".to_string(),
1092                    group![
1093                        ("A".to_string(), Field::Null),
1094                        ("b".to_string(), list![Field::Null]),
1095                        (
1096                            "C".to_string(),
1097                            group![(
1098                                "d".to_string(),
1099                                list![
1100                                    list![
1101                                        group![
1102                                            ("E".to_string(), Field::Null),
1103                                            ("F".to_string(), Field::Null)
1104                                        ],
1105                                        group![
1106                                            ("E".to_string(), Field::Int(10)),
1107                                            ("F".to_string(), Field::Str("aaa".to_string()))
1108                                        ],
1109                                        group![
1110                                            ("E".to_string(), Field::Null),
1111                                            ("F".to_string(), Field::Null)
1112                                        ],
1113                                        group![
1114                                            ("E".to_string(), Field::Int(-10)),
1115                                            ("F".to_string(), Field::Str("bbb".to_string()))
1116                                        ],
1117                                        group![
1118                                            ("E".to_string(), Field::Null),
1119                                            ("F".to_string(), Field::Null)
1120                                        ]
1121                                    ],
1122                                    list![
1123                                        group![
1124                                            ("E".to_string(), Field::Int(11)),
1125                                            ("F".to_string(), Field::Str("c".to_string()))
1126                                        ],
1127                                        Field::Null
1128                                    ],
1129                                    list![],
1130                                    Field::Null
1131                                ]
1132                            )]
1133                        ),
1134                        (
1135                            "g".to_string(),
1136                            map![
1137                                (
1138                                    Field::Str("g1".to_string()),
1139                                    group![(
1140                                        "H".to_string(),
1141                                        group![(
1142                                            "i".to_string(),
1143                                            list![Field::Double(2.2), Field::Null]
1144                                        )]
1145                                    )]
1146                                ),
1147                                (
1148                                    Field::Str("g2".to_string()),
1149                                    group![("H".to_string(), group![("i".to_string(), list![])])]
1150                                ),
1151                                (Field::Str("g3".to_string()), Field::Null),
1152                                (
1153                                    Field::Str("g4".to_string()),
1154                                    group![(
1155                                        "H".to_string(),
1156                                        group![("i".to_string(), Field::Null)]
1157                                    )]
1158                                ),
1159                                (
1160                                    Field::Str("g5".to_string()),
1161                                    group![("H".to_string(), Field::Null)]
1162                                )
1163                            ]
1164                        )
1165                    ]
1166                )
1167            ],
1168            row![
1169                ("id".to_string(), Field::Long(3)),
1170                ("int_array".to_string(), list![]),
1171                ("int_array_Array".to_string(), list![Field::Null]),
1172                ("int_map".to_string(), map![]),
1173                ("int_Map_Array".to_string(), list![Field::Null, Field::Null]),
1174                (
1175                    "nested_struct".to_string(),
1176                    group![
1177                        ("A".to_string(), Field::Null),
1178                        ("b".to_string(), Field::Null),
1179                        ("C".to_string(), group![("d".to_string(), list![])]),
1180                        ("g".to_string(), map![])
1181                    ]
1182                )
1183            ],
1184            row![
1185                ("id".to_string(), Field::Long(4)),
1186                ("int_array".to_string(), Field::Null),
1187                ("int_array_Array".to_string(), list![]),
1188                ("int_map".to_string(), map![]),
1189                ("int_Map_Array".to_string(), list![]),
1190                (
1191                    "nested_struct".to_string(),
1192                    group![
1193                        ("A".to_string(), Field::Null),
1194                        ("b".to_string(), Field::Null),
1195                        ("C".to_string(), group![("d".to_string(), Field::Null)]),
1196                        ("g".to_string(), Field::Null)
1197                    ]
1198                )
1199            ],
1200            row![
1201                ("id".to_string(), Field::Long(5)),
1202                ("int_array".to_string(), Field::Null),
1203                ("int_array_Array".to_string(), Field::Null),
1204                ("int_map".to_string(), map![]),
1205                ("int_Map_Array".to_string(), Field::Null),
1206                (
1207                    "nested_struct".to_string(),
1208                    group![
1209                        ("A".to_string(), Field::Null),
1210                        ("b".to_string(), Field::Null),
1211                        ("C".to_string(), Field::Null),
1212                        (
1213                            "g".to_string(),
1214                            map![(
1215                                Field::Str("foo".to_string()),
1216                                group![(
1217                                    "H".to_string(),
1218                                    group![(
1219                                        "i".to_string(),
1220                                        list![Field::Double(2.2), Field::Double(3.3)]
1221                                    )]
1222                                )]
1223                            )]
1224                        )
1225                    ]
1226                )
1227            ],
1228            row![
1229                ("id".to_string(), Field::Long(6)),
1230                ("int_array".to_string(), Field::Null),
1231                ("int_array_Array".to_string(), Field::Null),
1232                ("int_map".to_string(), Field::Null),
1233                ("int_Map_Array".to_string(), Field::Null),
1234                ("nested_struct".to_string(), Field::Null)
1235            ],
1236            row![
1237                ("id".to_string(), Field::Long(7)),
1238                ("int_array".to_string(), Field::Null),
1239                (
1240                    "int_array_Array".to_string(),
1241                    list![Field::Null, list![Field::Int(5), Field::Int(6)]]
1242                ),
1243                (
1244                    "int_map".to_string(),
1245                    map![
1246                        (Field::Str("k1".to_string()), Field::Null),
1247                        (Field::Str("k3".to_string()), Field::Null)
1248                    ]
1249                ),
1250                ("int_Map_Array".to_string(), Field::Null),
1251                (
1252                    "nested_struct".to_string(),
1253                    group![
1254                        ("A".to_string(), Field::Int(7)),
1255                        (
1256                            "b".to_string(),
1257                            list![Field::Int(2), Field::Int(3), Field::Null]
1258                        ),
1259                        (
1260                            "C".to_string(),
1261                            group![(
1262                                "d".to_string(),
1263                                list![list![], list![Field::Null], Field::Null]
1264                            )]
1265                        ),
1266                        ("g".to_string(), Field::Null)
1267                    ]
1268                )
1269            ],
1270        ];
1271        assert_eq!(rows, expected_rows);
1272    }
1273
1274    #[test]
1275    fn test_file_reader_rows_projection() {
1276        let schema = "
1277      message spark_schema {
1278        REQUIRED DOUBLE c;
1279        REQUIRED INT32 b;
1280      }
1281    ";
1282        let schema = parse_message_type(schema).unwrap();
1283        let rows = test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
1284        let expected_rows = vec![
1285            row![
1286                ("c".to_string(), Field::Double(1.0)),
1287                ("b".to_string(), Field::Int(1))
1288            ],
1289            row![
1290                ("c".to_string(), Field::Double(1.0)),
1291                ("b".to_string(), Field::Int(1))
1292            ],
1293            row![
1294                ("c".to_string(), Field::Double(1.0)),
1295                ("b".to_string(), Field::Int(1))
1296            ],
1297            row![
1298                ("c".to_string(), Field::Double(1.0)),
1299                ("b".to_string(), Field::Int(1))
1300            ],
1301            row![
1302                ("c".to_string(), Field::Double(1.0)),
1303                ("b".to_string(), Field::Int(1))
1304            ],
1305            row![
1306                ("c".to_string(), Field::Double(1.0)),
1307                ("b".to_string(), Field::Int(1))
1308            ],
1309        ];
1310        assert_eq!(rows, expected_rows);
1311    }
1312
1313    #[test]
1314    fn test_iter_columns_in_row() {
1315        let r = row![
1316            ("c".to_string(), Field::Double(1.0)),
1317            ("b".to_string(), Field::Int(1))
1318        ];
1319        let mut result = Vec::new();
1320        for (name, record) in r.get_column_iter() {
1321            result.push((name, record));
1322        }
1323        assert_eq!(
1324            vec![
1325                (&"c".to_string(), &Field::Double(1.0)),
1326                (&"b".to_string(), &Field::Int(1))
1327            ],
1328            result
1329        );
1330    }
1331
1332    #[test]
1333    fn test_into_columns_in_row() {
1334        let r = row![
1335            ("a".to_string(), Field::Str("My string".to_owned())),
1336            ("b".to_string(), Field::Int(1))
1337        ];
1338        assert_eq!(
1339            r.into_columns(),
1340            vec![
1341                ("a".to_string(), Field::Str("My string".to_owned())),
1342                ("b".to_string(), Field::Int(1)),
1343            ]
1344        );
1345    }
1346
1347    #[test]
1348    fn test_file_reader_rows_projection_map() {
1349        let schema = "
1350      message spark_schema {
1351        OPTIONAL group a (MAP) {
1352          REPEATED group key_value {
1353            REQUIRED BYTE_ARRAY key (UTF8);
1354            OPTIONAL group value (MAP) {
1355              REPEATED group key_value {
1356                REQUIRED INT32 key;
1357                REQUIRED BOOLEAN value;
1358              }
1359            }
1360          }
1361        }
1362      }
1363    ";
1364        let schema = parse_message_type(schema).unwrap();
1365        let rows = test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
1366        let expected_rows = vec![
1367            row![(
1368                "a".to_string(),
1369                map![(
1370                    Field::Str("a".to_string()),
1371                    map![
1372                        (Field::Int(1), Field::Bool(true)),
1373                        (Field::Int(2), Field::Bool(false))
1374                    ]
1375                )]
1376            )],
1377            row![(
1378                "a".to_string(),
1379                map![(
1380                    Field::Str("b".to_string()),
1381                    map![(Field::Int(1), Field::Bool(true))]
1382                )]
1383            )],
1384            row![(
1385                "a".to_string(),
1386                map![(Field::Str("c".to_string()), Field::Null)]
1387            )],
1388            row![("a".to_string(), map![(Field::Str("d".to_string()), map![])])],
1389            row![(
1390                "a".to_string(),
1391                map![(
1392                    Field::Str("e".to_string()),
1393                    map![(Field::Int(1), Field::Bool(true))]
1394                )]
1395            )],
1396            row![(
1397                "a".to_string(),
1398                map![(
1399                    Field::Str("f".to_string()),
1400                    map![
1401                        (Field::Int(3), Field::Bool(true)),
1402                        (Field::Int(4), Field::Bool(false)),
1403                        (Field::Int(5), Field::Bool(true))
1404                    ]
1405                )]
1406            )],
1407        ];
1408        assert_eq!(rows, expected_rows);
1409    }
1410
1411    #[test]
1412    fn test_file_reader_rows_projection_list() {
1413        let schema = "
1414      message spark_schema {
1415        OPTIONAL group a (LIST) {
1416          REPEATED group list {
1417            OPTIONAL group element (LIST) {
1418              REPEATED group list {
1419                OPTIONAL group element (LIST) {
1420                  REPEATED group list {
1421                    OPTIONAL BYTE_ARRAY element (UTF8);
1422                  }
1423                }
1424              }
1425            }
1426          }
1427        }
1428      }
1429    ";
1430        let schema = parse_message_type(schema).unwrap();
1431        let rows = test_file_reader_rows("nested_lists.snappy.parquet", Some(schema)).unwrap();
1432        let expected_rows = vec![
1433            row![(
1434                "a".to_string(),
1435                list![
1436                    list![
1437                        list![Field::Str("a".to_string()), Field::Str("b".to_string())],
1438                        list![Field::Str("c".to_string())]
1439                    ],
1440                    list![Field::Null, list![Field::Str("d".to_string())]]
1441                ]
1442            )],
1443            row![(
1444                "a".to_string(),
1445                list![
1446                    list![
1447                        list![Field::Str("a".to_string()), Field::Str("b".to_string())],
1448                        list![Field::Str("c".to_string()), Field::Str("d".to_string())]
1449                    ],
1450                    list![Field::Null, list![Field::Str("e".to_string())]]
1451                ]
1452            )],
1453            row![(
1454                "a".to_string(),
1455                list![
1456                    list![
1457                        list![Field::Str("a".to_string()), Field::Str("b".to_string())],
1458                        list![Field::Str("c".to_string()), Field::Str("d".to_string())],
1459                        list![Field::Str("e".to_string())]
1460                    ],
1461                    list![Field::Null, list![Field::Str("f".to_string())]]
1462                ]
1463            )],
1464        ];
1465        assert_eq!(rows, expected_rows);
1466    }
1467
1468    #[test]
1469    fn test_file_reader_rows_invalid_projection() {
1470        let schema = "
1471      message spark_schema {
1472        REQUIRED INT32 key;
1473        REQUIRED BOOLEAN value;
1474      }
1475    ";
1476        let schema = parse_message_type(schema).unwrap();
1477        let res = test_file_reader_rows("nested_maps.snappy.parquet", Some(schema));
1478        assert_eq!(
1479            res.unwrap_err().to_string(),
1480            "Parquet error: Root schema does not contain projection"
1481        );
1482    }
1483
1484    #[test]
1485    fn test_row_group_rows_invalid_projection() {
1486        let schema = "
1487      message spark_schema {
1488        REQUIRED INT32 key;
1489        REQUIRED BOOLEAN value;
1490      }
1491    ";
1492        let schema = parse_message_type(schema).unwrap();
1493        let res = test_row_group_rows("nested_maps.snappy.parquet", Some(schema));
1494        assert_eq!(
1495            res.unwrap_err().to_string(),
1496            "Parquet error: Root schema does not contain projection"
1497        );
1498    }
1499
1500    #[test]
1501    fn test_file_reader_rows_nested_map_type() {
1502        let schema = "
1503      message spark_schema {
1504        OPTIONAL group a (MAP) {
1505          REPEATED group key_value {
1506            REQUIRED BYTE_ARRAY key (UTF8);
1507            OPTIONAL group value (MAP) {
1508              REPEATED group key_value {
1509                REQUIRED INT32 key;
1510              }
1511            }
1512          }
1513        }
1514      }
1515    ";
1516        let schema = parse_message_type(schema).unwrap();
1517        test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap();
1518    }
1519
1520    #[test]
1521    fn test_file_reader_iter() {
1522        let path = get_test_path("alltypes_plain.parquet");
1523        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1524        let iter = RowIter::from_file_into(Box::new(reader));
1525
1526        let values: Vec<_> = iter.flat_map(|r| r.unwrap().get_int(0)).collect();
1527        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1528    }
1529
1530    #[test]
1531    fn test_file_reader_iter_projection() {
1532        let path = get_test_path("alltypes_plain.parquet");
1533        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1534        let schema = "message schema { OPTIONAL INT32 id; }";
1535        let proj = parse_message_type(schema).ok();
1536
1537        let iter = RowIter::from_file_into(Box::new(reader))
1538            .project(proj)
1539            .unwrap();
1540        let values: Vec<_> = iter.flat_map(|r| r.unwrap().get_int(0)).collect();
1541
1542        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1543    }
1544
1545    #[test]
1546    fn test_file_reader_iter_projection_err() {
1547        let schema = "
1548      message spark_schema {
1549        REQUIRED INT32 key;
1550        REQUIRED BOOLEAN value;
1551      }
1552    ";
1553        let proj = parse_message_type(schema).ok();
1554        let path = get_test_path("nested_maps.snappy.parquet");
1555        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1556        let res = RowIter::from_file_into(Box::new(reader)).project(proj);
1557
1558        assert_eq!(
1559            res.err().unwrap().to_string(),
1560            "Parquet error: Root schema does not contain projection"
1561        );
1562    }
1563
1564    #[test]
1565    fn test_tree_reader_handle_repeated_fields_with_no_annotation() {
1566        // Array field `phoneNumbers` does not contain LIST annotation.
1567        // We parse it as struct with `phone` repeated field as array.
1568        let rows = test_file_reader_rows("repeated_no_annotation.parquet", None).unwrap();
1569        let expected_rows = vec![
1570            row![
1571                ("id".to_string(), Field::Int(1)),
1572                ("phoneNumbers".to_string(), Field::Null)
1573            ],
1574            row![
1575                ("id".to_string(), Field::Int(2)),
1576                ("phoneNumbers".to_string(), Field::Null)
1577            ],
1578            row![
1579                ("id".to_string(), Field::Int(3)),
1580                (
1581                    "phoneNumbers".to_string(),
1582                    group![("phone".to_string(), list![])]
1583                )
1584            ],
1585            row![
1586                ("id".to_string(), Field::Int(4)),
1587                (
1588                    "phoneNumbers".to_string(),
1589                    group![(
1590                        "phone".to_string(),
1591                        list![group![
1592                            ("number".to_string(), Field::Long(5555555555)),
1593                            ("kind".to_string(), Field::Null)
1594                        ]]
1595                    )]
1596                )
1597            ],
1598            row![
1599                ("id".to_string(), Field::Int(5)),
1600                (
1601                    "phoneNumbers".to_string(),
1602                    group![(
1603                        "phone".to_string(),
1604                        list![group![
1605                            ("number".to_string(), Field::Long(1111111111)),
1606                            ("kind".to_string(), Field::Str("home".to_string()))
1607                        ]]
1608                    )]
1609                )
1610            ],
1611            row![
1612                ("id".to_string(), Field::Int(6)),
1613                (
1614                    "phoneNumbers".to_string(),
1615                    group![(
1616                        "phone".to_string(),
1617                        list![
1618                            group![
1619                                ("number".to_string(), Field::Long(1111111111)),
1620                                ("kind".to_string(), Field::Str("home".to_string()))
1621                            ],
1622                            group![
1623                                ("number".to_string(), Field::Long(2222222222)),
1624                                ("kind".to_string(), Field::Null)
1625                            ],
1626                            group![
1627                                ("number".to_string(), Field::Long(3333333333)),
1628                                ("kind".to_string(), Field::Str("mobile".to_string()))
1629                            ]
1630                        ]
1631                    )]
1632                )
1633            ],
1634        ];
1635
1636        assert_eq!(rows, expected_rows);
1637    }
1638
1639    #[test]
1640    fn test_tree_reader_handle_nested_repeated_fields_with_no_annotation() {
1641        // Create schema
1642        let schema = Arc::new(
1643            parse_message_type(
1644                "
1645            message schema {
1646                REPEATED group level1 {
1647                    REPEATED group level2 {
1648                        REQUIRED group level3 {
1649                            REQUIRED INT64 value3;
1650                        }
1651                    }
1652                    REQUIRED INT64 value1;
1653                }
1654            }",
1655            )
1656            .unwrap(),
1657        );
1658
1659        // Write Parquet file to buffer
1660        let mut buffer: Vec<u8> = Vec::new();
1661        let mut file_writer =
1662            SerializedFileWriter::new(&mut buffer, schema, Default::default()).unwrap();
1663        let mut row_group_writer = file_writer.next_row_group().unwrap();
1664
1665        // Write column level1.level2.level3.value3
1666        let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
1667        column_writer
1668            .typed::<Int64Type>()
1669            .write_batch(&[30, 31, 32], Some(&[2, 2, 2]), Some(&[0, 0, 0]))
1670            .unwrap();
1671        column_writer.close().unwrap();
1672
1673        // Write column level1.value1
1674        let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
1675        column_writer
1676            .typed::<Int64Type>()
1677            .write_batch(&[10, 11, 12], Some(&[1, 1, 1]), Some(&[0, 0, 0]))
1678            .unwrap();
1679        column_writer.close().unwrap();
1680
1681        // Finalize Parquet file
1682        row_group_writer.close().unwrap();
1683        file_writer.close().unwrap();
1684        assert_eq!(&buffer[0..4], b"PAR1");
1685
1686        // Read Parquet file from buffer
1687        let file_reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap();
1688        let rows: Vec<_> = file_reader
1689            .get_row_iter(None)
1690            .unwrap()
1691            .map(|row| row.unwrap())
1692            .collect();
1693
1694        let expected_rows = vec![
1695            row![(
1696                "level1".to_string(),
1697                list![group![
1698                    (
1699                        "level2".to_string(),
1700                        list![group![(
1701                            "level3".to_string(),
1702                            group![("value3".to_string(), Field::Long(30))]
1703                        )]]
1704                    ),
1705                    ("value1".to_string(), Field::Long(10))
1706                ]]
1707            )],
1708            row![(
1709                "level1".to_string(),
1710                list![group![
1711                    (
1712                        "level2".to_string(),
1713                        list![group![(
1714                            "level3".to_string(),
1715                            group![("value3".to_string(), Field::Long(31))]
1716                        )]]
1717                    ),
1718                    ("value1".to_string(), Field::Long(11))
1719                ]]
1720            )],
1721            row![(
1722                "level1".to_string(),
1723                list![group![
1724                    (
1725                        "level2".to_string(),
1726                        list![group![(
1727                            "level3".to_string(),
1728                            group![("value3".to_string(), Field::Long(32))]
1729                        )]]
1730                    ),
1731                    ("value1".to_string(), Field::Long(12))
1732                ]]
1733            )],
1734        ];
1735
1736        assert_eq!(rows, expected_rows);
1737    }
1738
1739    #[test]
1740    fn test_tree_reader_handle_primitive_repeated_fields_with_no_annotation() {
1741        // In this test the REPEATED fields are primitives
1742        let rows = test_file_reader_rows("repeated_primitive_no_list.parquet", None).unwrap();
1743        let expected_rows = vec![
1744            row![
1745                (
1746                    "Int32_list".to_string(),
1747                    Field::ListInternal(make_list([0, 1, 2, 3].map(Field::Int).to_vec()))
1748                ),
1749                (
1750                    "String_list".to_string(),
1751                    Field::ListInternal(make_list(
1752                        ["foo", "zero", "one", "two"]
1753                            .map(|s| Field::Str(s.to_string()))
1754                            .to_vec()
1755                    ))
1756                ),
1757                (
1758                    "group_of_lists".to_string(),
1759                    group![
1760                        (
1761                            "Int32_list_in_group".to_string(),
1762                            Field::ListInternal(make_list([0, 1, 2, 3].map(Field::Int).to_vec()))
1763                        ),
1764                        (
1765                            "String_list_in_group".to_string(),
1766                            Field::ListInternal(make_list(
1767                                ["foo", "zero", "one", "two"]
1768                                    .map(|s| Field::Str(s.to_string()))
1769                                    .to_vec()
1770                            ))
1771                        )
1772                    ]
1773                )
1774            ],
1775            row![
1776                (
1777                    "Int32_list".to_string(),
1778                    Field::ListInternal(make_list(vec![]))
1779                ),
1780                (
1781                    "String_list".to_string(),
1782                    Field::ListInternal(make_list(
1783                        ["three"].map(|s| Field::Str(s.to_string())).to_vec()
1784                    ))
1785                ),
1786                (
1787                    "group_of_lists".to_string(),
1788                    group![
1789                        (
1790                            "Int32_list_in_group".to_string(),
1791                            Field::ListInternal(make_list(vec![]))
1792                        ),
1793                        (
1794                            "String_list_in_group".to_string(),
1795                            Field::ListInternal(make_list(
1796                                ["three"].map(|s| Field::Str(s.to_string())).to_vec()
1797                            ))
1798                        )
1799                    ]
1800                )
1801            ],
1802            row![
1803                (
1804                    "Int32_list".to_string(),
1805                    Field::ListInternal(make_list(vec![Field::Int(4)]))
1806                ),
1807                (
1808                    "String_list".to_string(),
1809                    Field::ListInternal(make_list(
1810                        ["four"].map(|s| Field::Str(s.to_string())).to_vec()
1811                    ))
1812                ),
1813                (
1814                    "group_of_lists".to_string(),
1815                    group![
1816                        (
1817                            "Int32_list_in_group".to_string(),
1818                            Field::ListInternal(make_list(vec![Field::Int(4)]))
1819                        ),
1820                        (
1821                            "String_list_in_group".to_string(),
1822                            Field::ListInternal(make_list(
1823                                ["four"].map(|s| Field::Str(s.to_string())).to_vec()
1824                            ))
1825                        )
1826                    ]
1827                )
1828            ],
1829            row![
1830                (
1831                    "Int32_list".to_string(),
1832                    Field::ListInternal(make_list([5, 6, 7, 8].map(Field::Int).to_vec()))
1833                ),
1834                (
1835                    "String_list".to_string(),
1836                    Field::ListInternal(make_list(
1837                        ["five", "six", "seven", "eight"]
1838                            .map(|s| Field::Str(s.to_string()))
1839                            .to_vec()
1840                    ))
1841                ),
1842                (
1843                    "group_of_lists".to_string(),
1844                    group![
1845                        (
1846                            "Int32_list_in_group".to_string(),
1847                            Field::ListInternal(make_list([5, 6, 7, 8].map(Field::Int).to_vec()))
1848                        ),
1849                        (
1850                            "String_list_in_group".to_string(),
1851                            Field::ListInternal(make_list(
1852                                ["five", "six", "seven", "eight"]
1853                                    .map(|s| Field::Str(s.to_string()))
1854                                    .to_vec()
1855                            ))
1856                        )
1857                    ]
1858                )
1859            ],
1860        ];
1861        assert_eq!(rows, expected_rows);
1862    }
1863
1864    #[test]
1865    fn test_map_no_value() {
1866        // File schema:
1867        // message schema {
1868        //   required group my_map (MAP) {
1869        //     repeated group key_value {
1870        //       required int32 key;
1871        //       optional int32 value;
1872        //     }
1873        //   }
1874        //   required group my_map_no_v (MAP) {
1875        //     repeated group key_value {
1876        //       required int32 key;
1877        //     }
1878        //   }
1879        //   required group my_list (LIST) {
1880        //     repeated group list {
1881        //       required int32 element;
1882        //     }
1883        //   }
1884        // }
1885        let rows = test_file_reader_rows("map_no_value.parquet", None).unwrap();
1886
1887        // the my_map_no_v and my_list columns should be equivalent lists by this point
1888        for row in rows {
1889            let cols = row.into_columns();
1890            assert_eq!(cols[1].1, cols[2].1);
1891        }
1892    }
1893
1894    fn test_file_reader_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> {
1895        let file = get_test_file(file_name);
1896        let file_reader: Box<dyn FileReader> = Box::new(SerializedFileReader::new(file)?);
1897        let iter = file_reader.get_row_iter(schema)?;
1898        Ok(iter.map(|row| row.unwrap()).collect())
1899    }
1900
1901    fn test_row_group_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> {
1902        let file = get_test_file(file_name);
1903        let file_reader: Box<dyn FileReader> = Box::new(SerializedFileReader::new(file)?);
1904        // Check the first row group only, because files will contain only single row
1905        // group
1906        let row_group_reader = file_reader.get_row_group(0).unwrap();
1907        let iter = row_group_reader.get_row_iter(schema)?;
1908        Ok(iter.map(|row| row.unwrap()).collect())
1909    }
1910
1911    #[test]
1912    fn test_read_old_nested_list() {
1913        let rows = test_file_reader_rows("old_list_structure.parquet", None).unwrap();
1914        let expected_rows = vec![row![(
1915            "a".to_string(),
1916            Field::ListInternal(make_list(
1917                [
1918                    make_list([1, 2].map(Field::Int).to_vec()),
1919                    make_list([3, 4].map(Field::Int).to_vec())
1920                ]
1921                .map(Field::ListInternal)
1922                .to_vec()
1923            ))
1924        ),]];
1925        assert_eq!(rows, expected_rows);
1926    }
1927
1928    fn assert_err_on_overcount(file_name: &str, proj_schema: Option<Type>) {
1929        let file = get_test_file(file_name);
1930        let file_reader = SerializedFileReader::new(file).unwrap();
1931        let metadata = file_reader.metadata();
1932        let row_group_reader = file_reader.get_row_group(0).unwrap();
1933        let actual_rows = row_group_reader.metadata().num_rows() as usize;
1934
1935        let descr = match proj_schema {
1936            Some(schema) => Arc::new(SchemaDescriptor::new(Arc::new(schema))),
1937            None => metadata.file_metadata().schema_descr_ptr(),
1938        };
1939        let reader = TreeBuilder::new().build(descr, &*row_group_reader).unwrap();
1940        let iter = ReaderIter::new(reader, actual_rows + 1).unwrap();
1941
1942        let rows: Vec<Result<Row>> = iter.collect();
1943        assert_eq!(rows.len(), actual_rows + 1);
1944        for row in &rows[..actual_rows] {
1945            assert!(row.is_ok(), "Expected Ok row, got: {:?}", row);
1946        }
1947        let err = rows[actual_rows].as_ref().unwrap_err();
1948        assert!(
1949            err.to_string().contains("Unexpected end of column data"),
1950            "Unexpected error message: {}",
1951            err
1952        );
1953    }
1954
1955    #[test]
1956    fn test_reader_iter_returns_error_when_num_records_exceeds_data() {
1957        assert_err_on_overcount("nulls.snappy.parquet", None);
1958    }
1959
1960    #[test]
1961    fn test_reader_iter_returns_error_for_repeated_field_when_num_records_exceeds_data() {
1962        assert_err_on_overcount("repeated_primitive_no_list.parquet", None);
1963    }
1964
1965    #[test]
1966    fn test_reader_iter_returns_error_for_map_field_when_num_records_exceeds_data() {
1967        let schema = parse_message_type(
1968            "message schema {
1969               REQUIRED group my_map (MAP) {
1970                 REPEATED group key_value {
1971                   REQUIRED INT32 key;
1972                   OPTIONAL INT32 value;
1973                 }
1974               }
1975             }",
1976        )
1977        .unwrap();
1978        assert_err_on_overcount("map_no_value.parquet", Some(schema));
1979    }
1980}