parquet/file/
serialized_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader
19//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM)
20
21use crate::basic::{PageType, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{Codec, create_codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{CryptoContext, read_and_decrypt};
27use crate::errors::{ParquetError, Result};
28use crate::file::metadata::thrift::PageHeader;
29use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
30use crate::file::statistics;
31use crate::file::{
32    metadata::*,
33    properties::{ReaderProperties, ReaderPropertiesPtr},
34    reader::*,
35};
36#[cfg(feature = "encryption")]
37use crate::parquet_thrift::ThriftSliceInputProtocol;
38use crate::parquet_thrift::{ReadThrift, ThriftReadInputProtocol};
39use crate::record::Row;
40use crate::record::reader::RowIter;
41use crate::schema::types::Type as SchemaType;
42use bytes::Bytes;
43use std::collections::VecDeque;
44use std::{fs::File, io::Read, path::Path, sync::Arc};
45
46impl TryFrom<File> for SerializedFileReader<File> {
47    type Error = ParquetError;
48
49    fn try_from(file: File) -> Result<Self> {
50        Self::new(file)
51    }
52}
53
54impl TryFrom<&Path> for SerializedFileReader<File> {
55    type Error = ParquetError;
56
57    fn try_from(path: &Path) -> Result<Self> {
58        let file = File::open(path)?;
59        Self::try_from(file)
60    }
61}
62
63impl TryFrom<String> for SerializedFileReader<File> {
64    type Error = ParquetError;
65
66    fn try_from(path: String) -> Result<Self> {
67        Self::try_from(Path::new(&path))
68    }
69}
70
71impl TryFrom<&str> for SerializedFileReader<File> {
72    type Error = ParquetError;
73
74    fn try_from(path: &str) -> Result<Self> {
75        Self::try_from(Path::new(&path))
76    }
77}
78
79/// Conversion into a [`RowIter`]
80/// using the full file schema over all row groups.
81impl IntoIterator for SerializedFileReader<File> {
82    type Item = Result<Row>;
83    type IntoIter = RowIter<'static>;
84
85    fn into_iter(self) -> Self::IntoIter {
86        RowIter::from_file_into(Box::new(self))
87    }
88}
89
90// ----------------------------------------------------------------------
91// Implementations of file & row group readers
92
93/// A serialized implementation for Parquet [`FileReader`].
94pub struct SerializedFileReader<R: ChunkReader> {
95    chunk_reader: Arc<R>,
96    metadata: Arc<ParquetMetaData>,
97    props: ReaderPropertiesPtr,
98}
99
100/// A predicate for filtering row groups, invoked with the metadata and index
101/// of each row group in the file. Only row groups for which the predicate
102/// evaluates to `true` will be scanned
103pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
104
105/// A builder for [`ReadOptions`].
106/// For the predicates that are added to the builder,
107/// they will be chained using 'AND' to filter the row groups.
108#[derive(Default)]
109pub struct ReadOptionsBuilder {
110    predicates: Vec<ReadGroupPredicate>,
111    enable_page_index: bool,
112    props: Option<ReaderProperties>,
113}
114
115impl ReadOptionsBuilder {
116    /// New builder
117    pub fn new() -> Self {
118        Self::default()
119    }
120
121    /// Add a predicate on row group metadata to the reading option,
122    /// Filter only row groups that match the predicate criteria
123    pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
124        self.predicates.push(predicate);
125        self
126    }
127
128    /// Add a range predicate on filtering row groups if their midpoints are within
129    /// the Closed-Open range `[start..end) {x | start <= x < end}`
130    pub fn with_range(mut self, start: i64, end: i64) -> Self {
131        assert!(start < end);
132        let predicate = move |rg: &RowGroupMetaData, _: usize| {
133            let mid = get_midpoint_offset(rg);
134            mid >= start && mid < end
135        };
136        self.predicates.push(Box::new(predicate));
137        self
138    }
139
140    /// Enable reading the page index structures described in
141    /// "[Column Index] Layout to Support Page Skipping"
142    ///
143    /// [Column Index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
144    pub fn with_page_index(mut self) -> Self {
145        self.enable_page_index = true;
146        self
147    }
148
149    /// Set the [`ReaderProperties`] configuration.
150    pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
151        self.props = Some(properties);
152        self
153    }
154
155    /// Seal the builder and return the read options
156    pub fn build(self) -> ReadOptions {
157        let props = self
158            .props
159            .unwrap_or_else(|| ReaderProperties::builder().build());
160        ReadOptions {
161            predicates: self.predicates,
162            enable_page_index: self.enable_page_index,
163            props,
164        }
165    }
166}
167
168/// A collection of options for reading a Parquet file.
169///
170/// Currently, only predicates on row group metadata are supported.
171/// All predicates will be chained using 'AND' to filter the row groups.
172pub struct ReadOptions {
173    predicates: Vec<ReadGroupPredicate>,
174    enable_page_index: bool,
175    props: ReaderProperties,
176}
177
178impl<R: 'static + ChunkReader> SerializedFileReader<R> {
179    /// Creates file reader from a Parquet file.
180    /// Returns an error if the Parquet file does not exist or is corrupt.
181    pub fn new(chunk_reader: R) -> Result<Self> {
182        let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
183        let props = Arc::new(ReaderProperties::builder().build());
184        Ok(Self {
185            chunk_reader: Arc::new(chunk_reader),
186            metadata: Arc::new(metadata),
187            props,
188        })
189    }
190
191    /// Creates file reader from a Parquet file with read options.
192    /// Returns an error if the Parquet file does not exist or is corrupt.
193    #[allow(deprecated)]
194    pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
195        let mut metadata_builder = ParquetMetaDataReader::new()
196            .parse_and_finish(&chunk_reader)?
197            .into_builder();
198        let mut predicates = options.predicates;
199
200        // Filter row groups based on the predicates
201        for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
202            let mut keep = true;
203            for predicate in &mut predicates {
204                if !predicate(&rg_meta, i) {
205                    keep = false;
206                    break;
207                }
208            }
209            if keep {
210                metadata_builder = metadata_builder.add_row_group(rg_meta);
211            }
212        }
213
214        let mut metadata = metadata_builder.build();
215
216        // If page indexes are desired, build them with the filtered set of row groups
217        if options.enable_page_index {
218            let mut reader =
219                ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
220            reader.read_page_indexes(&chunk_reader)?;
221            metadata = reader.finish()?;
222        }
223
224        Ok(Self {
225            chunk_reader: Arc::new(chunk_reader),
226            metadata: Arc::new(metadata),
227            props: Arc::new(options.props),
228        })
229    }
230}
231
232/// Get midpoint offset for a row group
233fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
234    let col = meta.column(0);
235    let mut offset = col.data_page_offset();
236    if let Some(dic_offset) = col.dictionary_page_offset() {
237        if offset > dic_offset {
238            offset = dic_offset
239        }
240    };
241    offset + meta.compressed_size() / 2
242}
243
244impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
245    fn metadata(&self) -> &ParquetMetaData {
246        &self.metadata
247    }
248
249    fn num_row_groups(&self) -> usize {
250        self.metadata.num_row_groups()
251    }
252
253    fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
254        let row_group_metadata = self.metadata.row_group(i);
255        // Row groups should be processed sequentially.
256        let props = Arc::clone(&self.props);
257        let f = Arc::clone(&self.chunk_reader);
258        Ok(Box::new(SerializedRowGroupReader::new(
259            f,
260            row_group_metadata,
261            self.metadata.offset_index().map(|x| x[i].as_slice()),
262            props,
263        )?))
264    }
265
266    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
267        RowIter::from_file(projection, self)
268    }
269}
270
271/// A serialized implementation for Parquet [`RowGroupReader`].
272pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
273    chunk_reader: Arc<R>,
274    metadata: &'a RowGroupMetaData,
275    offset_index: Option<&'a [OffsetIndexMetaData]>,
276    props: ReaderPropertiesPtr,
277    bloom_filters: Vec<Option<Sbbf>>,
278}
279
280impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
281    /// Creates new row group reader from a file, row group metadata and custom config.
282    pub fn new(
283        chunk_reader: Arc<R>,
284        metadata: &'a RowGroupMetaData,
285        offset_index: Option<&'a [OffsetIndexMetaData]>,
286        props: ReaderPropertiesPtr,
287    ) -> Result<Self> {
288        let bloom_filters = if props.read_bloom_filter() {
289            metadata
290                .columns()
291                .iter()
292                .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
293                .collect::<Result<Vec<_>>>()?
294        } else {
295            std::iter::repeat_n(None, metadata.columns().len()).collect()
296        };
297        Ok(Self {
298            chunk_reader,
299            metadata,
300            offset_index,
301            props,
302            bloom_filters,
303        })
304    }
305}
306
307impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
308    fn metadata(&self) -> &RowGroupMetaData {
309        self.metadata
310    }
311
312    fn num_columns(&self) -> usize {
313        self.metadata.num_columns()
314    }
315
316    // TODO: fix PARQUET-816
317    fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
318        let col = self.metadata.column(i);
319
320        let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
321
322        let props = Arc::clone(&self.props);
323        Ok(Box::new(SerializedPageReader::new_with_properties(
324            Arc::clone(&self.chunk_reader),
325            col,
326            usize::try_from(self.metadata.num_rows())?,
327            page_locations,
328            props,
329        )?))
330    }
331
332    /// get bloom filter for the `i`th column
333    fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
334        self.bloom_filters[i].as_ref()
335    }
336
337    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
338        RowIter::from_row_group(projection, self)
339    }
340}
341
342/// Decodes a [`Page`] from the provided `buffer`
343pub(crate) fn decode_page(
344    page_header: PageHeader,
345    buffer: Bytes,
346    physical_type: Type,
347    decompressor: Option<&mut Box<dyn Codec>>,
348) -> Result<Page> {
349    // Verify the 32-bit CRC checksum of the page
350    #[cfg(feature = "crc")]
351    if let Some(expected_crc) = page_header.crc {
352        let crc = crc32fast::hash(&buffer);
353        if crc != expected_crc as u32 {
354            return Err(general_err!("Page CRC checksum mismatch"));
355        }
356    }
357
358    // When processing data page v2, depending on enabled compression for the
359    // page, we should account for uncompressed data ('offset') of
360    // repetition and definition levels.
361    //
362    // We always use 0 offset for other pages other than v2, `true` flag means
363    // that compression will be applied if decompressor is defined
364    let mut offset: usize = 0;
365    let mut can_decompress = true;
366
367    if let Some(ref header_v2) = page_header.data_page_header_v2 {
368        if header_v2.definition_levels_byte_length < 0
369            || header_v2.repetition_levels_byte_length < 0
370            || header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
371                > page_header.uncompressed_page_size
372        {
373            return Err(general_err!(
374                "DataPage v2 header contains implausible values \
375                    for definition_levels_byte_length ({}) \
376                    and repetition_levels_byte_length ({}) \
377                    given DataPage header provides uncompressed_page_size ({})",
378                header_v2.definition_levels_byte_length,
379                header_v2.repetition_levels_byte_length,
380                page_header.uncompressed_page_size
381            ));
382        }
383        offset = usize::try_from(
384            header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
385        )?;
386        // When is_compressed flag is missing the page is considered compressed
387        can_decompress = header_v2.is_compressed.unwrap_or(true);
388    }
389
390    // TODO: page header could be huge because of statistics. We should set a
391    // maximum page header size and abort if that is exceeded.
392    let buffer = match decompressor {
393        Some(decompressor) if can_decompress => {
394            let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
395            if offset > buffer.len() || offset > uncompressed_page_size {
396                return Err(general_err!("Invalid page header"));
397            }
398            let decompressed_size = uncompressed_page_size - offset;
399            let mut decompressed = Vec::with_capacity(uncompressed_page_size);
400            decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
401            if decompressed_size > 0 {
402                let compressed = &buffer.as_ref()[offset..];
403                decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
404            }
405
406            if decompressed.len() != uncompressed_page_size {
407                return Err(general_err!(
408                    "Actual decompressed size doesn't match the expected one ({} vs {})",
409                    decompressed.len(),
410                    uncompressed_page_size
411                ));
412            }
413
414            Bytes::from(decompressed)
415        }
416        _ => buffer,
417    };
418
419    let result = match page_header.r#type {
420        PageType::DICTIONARY_PAGE => {
421            let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
422                ParquetError::General("Missing dictionary page header".to_string())
423            })?;
424            let is_sorted = dict_header.is_sorted.unwrap_or(false);
425            Page::DictionaryPage {
426                buf: buffer,
427                num_values: dict_header.num_values.try_into()?,
428                encoding: dict_header.encoding,
429                is_sorted,
430            }
431        }
432        PageType::DATA_PAGE => {
433            let header = page_header
434                .data_page_header
435                .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
436            Page::DataPage {
437                buf: buffer,
438                num_values: header.num_values.try_into()?,
439                encoding: header.encoding,
440                def_level_encoding: header.definition_level_encoding,
441                rep_level_encoding: header.repetition_level_encoding,
442                statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
443            }
444        }
445        PageType::DATA_PAGE_V2 => {
446            let header = page_header
447                .data_page_header_v2
448                .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
449            let is_compressed = header.is_compressed.unwrap_or(true);
450            Page::DataPageV2 {
451                buf: buffer,
452                num_values: header.num_values.try_into()?,
453                encoding: header.encoding,
454                num_nulls: header.num_nulls.try_into()?,
455                num_rows: header.num_rows.try_into()?,
456                def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
457                rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
458                is_compressed,
459                statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
460            }
461        }
462        _ => {
463            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
464            return Err(general_err!(
465                "Page type {:?} is not supported",
466                page_header.r#type
467            ));
468        }
469    };
470
471    Ok(result)
472}
473
474enum SerializedPageReaderState {
475    Values {
476        /// The current byte offset in the reader
477        /// Note that offset is u64 (i.e., not usize) to support 32-bit architectures such as WASM
478        offset: u64,
479
480        /// The length of the chunk in bytes
481        /// Note that remaining_bytes is u64 (i.e., not usize) to support 32-bit architectures such as WASM
482        remaining_bytes: u64,
483
484        // If the next page header has already been "peeked", we will cache it and it`s length here
485        next_page_header: Option<Box<PageHeader>>,
486
487        /// The index of the data page within this column chunk
488        page_index: usize,
489
490        /// Whether the next page is expected to be a dictionary page
491        require_dictionary: bool,
492    },
493    Pages {
494        /// Remaining page locations
495        page_locations: VecDeque<PageLocation>,
496        /// Remaining dictionary location if any
497        dictionary_page: Option<PageLocation>,
498        /// The total number of rows in this column chunk
499        total_rows: usize,
500        /// The index of the data page within this column chunk
501        page_index: usize,
502    },
503}
504
505#[derive(Default)]
506struct SerializedPageReaderContext {
507    /// Controls decoding of page-level statistics
508    read_stats: bool,
509    /// Crypto context carrying objects required for decryption
510    #[cfg(feature = "encryption")]
511    crypto_context: Option<Arc<CryptoContext>>,
512}
513
514/// A serialized implementation for Parquet [`PageReader`].
515pub struct SerializedPageReader<R: ChunkReader> {
516    /// The chunk reader
517    reader: Arc<R>,
518
519    /// The compression codec for this column chunk. Only set for non-PLAIN codec.
520    decompressor: Option<Box<dyn Codec>>,
521
522    /// Column chunk type.
523    physical_type: Type,
524
525    state: SerializedPageReaderState,
526
527    context: SerializedPageReaderContext,
528}
529
530impl<R: ChunkReader> SerializedPageReader<R> {
531    /// Creates a new serialized page reader from a chunk reader and metadata
532    pub fn new(
533        reader: Arc<R>,
534        column_chunk_metadata: &ColumnChunkMetaData,
535        total_rows: usize,
536        page_locations: Option<Vec<PageLocation>>,
537    ) -> Result<Self> {
538        let props = Arc::new(ReaderProperties::builder().build());
539        SerializedPageReader::new_with_properties(
540            reader,
541            column_chunk_metadata,
542            total_rows,
543            page_locations,
544            props,
545        )
546    }
547
548    /// Stub No-op implementation when encryption is disabled.
549    #[cfg(all(feature = "arrow", not(feature = "encryption")))]
550    pub(crate) fn add_crypto_context(
551        self,
552        _rg_idx: usize,
553        _column_idx: usize,
554        _parquet_meta_data: &ParquetMetaData,
555        _column_chunk_metadata: &ColumnChunkMetaData,
556    ) -> Result<SerializedPageReader<R>> {
557        Ok(self)
558    }
559
560    /// Adds any necessary crypto context to this page reader, if encryption is enabled.
561    #[cfg(feature = "encryption")]
562    pub(crate) fn add_crypto_context(
563        mut self,
564        rg_idx: usize,
565        column_idx: usize,
566        parquet_meta_data: &ParquetMetaData,
567        column_chunk_metadata: &ColumnChunkMetaData,
568    ) -> Result<SerializedPageReader<R>> {
569        let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
570            return Ok(self);
571        };
572        let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
573            return Ok(self);
574        };
575        let crypto_context =
576            CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
577        self.context.crypto_context = Some(Arc::new(crypto_context));
578        Ok(self)
579    }
580
581    /// Creates a new serialized page with custom options.
582    pub fn new_with_properties(
583        reader: Arc<R>,
584        meta: &ColumnChunkMetaData,
585        total_rows: usize,
586        page_locations: Option<Vec<PageLocation>>,
587        props: ReaderPropertiesPtr,
588    ) -> Result<Self> {
589        let decompressor = create_codec(meta.compression(), props.codec_options())?;
590        let (start, len) = meta.byte_range();
591
592        let state = match page_locations {
593            Some(locations) => {
594                // If the offset of the first page doesn't match the start of the column chunk
595                // then the preceding space must contain a dictionary page.
596                let dictionary_page = match locations.first() {
597                    Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
598                        offset: start as i64,
599                        compressed_page_size: (dict_offset.offset as u64 - start) as i32,
600                        first_row_index: 0,
601                    }),
602                    _ => None,
603                };
604
605                SerializedPageReaderState::Pages {
606                    page_locations: locations.into(),
607                    dictionary_page,
608                    total_rows,
609                    page_index: 0,
610                }
611            }
612            None => SerializedPageReaderState::Values {
613                offset: start,
614                remaining_bytes: len,
615                next_page_header: None,
616                page_index: 0,
617                require_dictionary: meta.dictionary_page_offset().is_some(),
618            },
619        };
620        let mut context = SerializedPageReaderContext::default();
621        if props.read_page_stats() {
622            context.read_stats = true;
623        }
624        Ok(Self {
625            reader,
626            decompressor,
627            state,
628            physical_type: meta.column_type(),
629            context,
630        })
631    }
632
633    /// Similar to `peek_next_page`, but returns the offset of the next page instead of the page metadata.
634    /// Unlike page metadata, an offset can uniquely identify a page.
635    ///
636    /// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice.
637    /// This function allows us to check if the next page is being cached or read previously.
638    #[cfg(test)]
639    fn peek_next_page_offset(&mut self) -> Result<Option<u64>> {
640        match &mut self.state {
641            SerializedPageReaderState::Values {
642                offset,
643                remaining_bytes,
644                next_page_header,
645                page_index,
646                require_dictionary,
647            } => {
648                loop {
649                    if *remaining_bytes == 0 {
650                        return Ok(None);
651                    }
652                    return if let Some(header) = next_page_header.as_ref() {
653                        if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
654                            Ok(Some(*offset))
655                        } else {
656                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
657                            *next_page_header = None;
658                            continue;
659                        }
660                    } else {
661                        let mut read = self.reader.get_read(*offset)?;
662                        let (header_len, header) = Self::read_page_header_len(
663                            &self.context,
664                            &mut read,
665                            *page_index,
666                            *require_dictionary,
667                        )?;
668                        *offset += header_len as u64;
669                        *remaining_bytes -= header_len as u64;
670                        let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
671                            Ok(Some(*offset))
672                        } else {
673                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
674                            continue;
675                        };
676                        *next_page_header = Some(Box::new(header));
677                        page_meta
678                    };
679                }
680            }
681            SerializedPageReaderState::Pages {
682                page_locations,
683                dictionary_page,
684                ..
685            } => {
686                if let Some(page) = dictionary_page {
687                    Ok(Some(page.offset as u64))
688                } else if let Some(page) = page_locations.front() {
689                    Ok(Some(page.offset as u64))
690                } else {
691                    Ok(None)
692                }
693            }
694        }
695    }
696
697    fn read_page_header_len<T: Read>(
698        context: &SerializedPageReaderContext,
699        input: &mut T,
700        page_index: usize,
701        dictionary_page: bool,
702    ) -> Result<(usize, PageHeader)> {
703        /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
704        struct TrackedRead<R> {
705            inner: R,
706            bytes_read: usize,
707        }
708
709        impl<R: Read> Read for TrackedRead<R> {
710            fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
711                let v = self.inner.read(buf)?;
712                self.bytes_read += v;
713                Ok(v)
714            }
715        }
716
717        let mut tracked = TrackedRead {
718            inner: input,
719            bytes_read: 0,
720        };
721        let header = context.read_page_header(&mut tracked, page_index, dictionary_page)?;
722        Ok((tracked.bytes_read, header))
723    }
724
725    fn read_page_header_len_from_bytes(
726        context: &SerializedPageReaderContext,
727        buffer: &[u8],
728        page_index: usize,
729        dictionary_page: bool,
730    ) -> Result<(usize, PageHeader)> {
731        let mut input = std::io::Cursor::new(buffer);
732        let header = context.read_page_header(&mut input, page_index, dictionary_page)?;
733        let header_len = input.position() as usize;
734        Ok((header_len, header))
735    }
736}
737
738#[cfg(not(feature = "encryption"))]
739impl SerializedPageReaderContext {
740    fn read_page_header<T: Read>(
741        &self,
742        input: &mut T,
743        _page_index: usize,
744        _dictionary_page: bool,
745    ) -> Result<PageHeader> {
746        let mut prot = ThriftReadInputProtocol::new(input);
747        if self.read_stats {
748            Ok(PageHeader::read_thrift(&mut prot)?)
749        } else {
750            Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
751        }
752    }
753
754    fn decrypt_page_data<T>(
755        &self,
756        buffer: T,
757        _page_index: usize,
758        _dictionary_page: bool,
759    ) -> Result<T> {
760        Ok(buffer)
761    }
762}
763
764#[cfg(feature = "encryption")]
765impl SerializedPageReaderContext {
766    fn read_page_header<T: Read>(
767        &self,
768        input: &mut T,
769        page_index: usize,
770        dictionary_page: bool,
771    ) -> Result<PageHeader> {
772        match self.page_crypto_context(page_index, dictionary_page) {
773            None => {
774                let mut prot = ThriftReadInputProtocol::new(input);
775                if self.read_stats {
776                    Ok(PageHeader::read_thrift(&mut prot)?)
777                } else {
778                    use crate::file::metadata::thrift::PageHeader;
779
780                    Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
781                }
782            }
783            Some(page_crypto_context) => {
784                let data_decryptor = page_crypto_context.data_decryptor();
785                let aad = page_crypto_context.create_page_header_aad()?;
786
787                let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
788                    ParquetError::General(format!(
789                        "Error decrypting page header for column {}, decryption key may be wrong",
790                        page_crypto_context.column_ordinal
791                    ))
792                })?;
793
794                let mut prot = ThriftSliceInputProtocol::new(buf.as_slice());
795                if self.read_stats {
796                    Ok(PageHeader::read_thrift(&mut prot)?)
797                } else {
798                    Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
799                }
800            }
801        }
802    }
803
804    fn decrypt_page_data<T>(&self, buffer: T, page_index: usize, dictionary_page: bool) -> Result<T>
805    where
806        T: AsRef<[u8]>,
807        T: From<Vec<u8>>,
808    {
809        let page_crypto_context = self.page_crypto_context(page_index, dictionary_page);
810        if let Some(page_crypto_context) = page_crypto_context {
811            let decryptor = page_crypto_context.data_decryptor();
812            let aad = page_crypto_context.create_page_aad()?;
813            let decrypted = decryptor.decrypt(buffer.as_ref(), &aad)?;
814            Ok(T::from(decrypted))
815        } else {
816            Ok(buffer)
817        }
818    }
819
820    fn page_crypto_context(
821        &self,
822        page_index: usize,
823        dictionary_page: bool,
824    ) -> Option<Arc<CryptoContext>> {
825        self.crypto_context.as_ref().map(|c| {
826            Arc::new(if dictionary_page {
827                c.for_dictionary_page()
828            } else {
829                c.with_page_ordinal(page_index)
830            })
831        })
832    }
833}
834
835impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
836    type Item = Result<Page>;
837
838    fn next(&mut self) -> Option<Self::Item> {
839        self.get_next_page().transpose()
840    }
841}
842
843fn verify_page_header_len(header_len: usize, remaining_bytes: u64) -> Result<()> {
844    if header_len as u64 > remaining_bytes {
845        return Err(eof_err!("Invalid page header"));
846    }
847    Ok(())
848}
849
850fn verify_page_size(
851    compressed_size: i32,
852    uncompressed_size: i32,
853    remaining_bytes: u64,
854) -> Result<()> {
855    // The page's compressed size should not exceed the remaining bytes that are
856    // available to read. The page's uncompressed size is the expected size
857    // after decompression, which can never be negative.
858    if compressed_size < 0 || compressed_size as u64 > remaining_bytes || uncompressed_size < 0 {
859        return Err(eof_err!("Invalid page header"));
860    }
861    Ok(())
862}
863
864impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
865    fn get_next_page(&mut self) -> Result<Option<Page>> {
866        loop {
867            let page = match &mut self.state {
868                SerializedPageReaderState::Values {
869                    offset,
870                    remaining_bytes: remaining,
871                    next_page_header,
872                    page_index,
873                    require_dictionary,
874                } => {
875                    if *remaining == 0 {
876                        return Ok(None);
877                    }
878
879                    let mut read = self.reader.get_read(*offset)?;
880                    let header = if let Some(header) = next_page_header.take() {
881                        *header
882                    } else {
883                        let (header_len, header) = Self::read_page_header_len(
884                            &self.context,
885                            &mut read,
886                            *page_index,
887                            *require_dictionary,
888                        )?;
889                        verify_page_header_len(header_len, *remaining)?;
890                        *offset += header_len as u64;
891                        *remaining -= header_len as u64;
892                        header
893                    };
894                    verify_page_size(
895                        header.compressed_page_size,
896                        header.uncompressed_page_size,
897                        *remaining,
898                    )?;
899                    let data_len = header.compressed_page_size as usize;
900                    *offset += data_len as u64;
901                    *remaining -= data_len as u64;
902
903                    if header.r#type == PageType::INDEX_PAGE {
904                        continue;
905                    }
906
907                    let mut buffer = Vec::with_capacity(data_len);
908                    let read = read.take(data_len as u64).read_to_end(&mut buffer)?;
909
910                    if read != data_len {
911                        return Err(eof_err!(
912                            "Expected to read {} bytes of page, read only {}",
913                            data_len,
914                            read
915                        ));
916                    }
917
918                    let buffer =
919                        self.context
920                            .decrypt_page_data(buffer, *page_index, *require_dictionary)?;
921
922                    let page = decode_page(
923                        header,
924                        Bytes::from(buffer),
925                        self.physical_type,
926                        self.decompressor.as_mut(),
927                    )?;
928                    if page.is_data_page() {
929                        *page_index += 1;
930                    } else if page.is_dictionary_page() {
931                        *require_dictionary = false;
932                    }
933                    page
934                }
935                SerializedPageReaderState::Pages {
936                    page_locations,
937                    dictionary_page,
938                    page_index,
939                    ..
940                } => {
941                    let (front, is_dictionary_page) = match dictionary_page.take() {
942                        Some(front) => (front, true),
943                        None => match page_locations.pop_front() {
944                            Some(front) => (front, false),
945                            None => return Ok(None),
946                        },
947                    };
948
949                    let page_len = usize::try_from(front.compressed_page_size)?;
950                    let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
951
952                    let (offset, header) = Self::read_page_header_len_from_bytes(
953                        &self.context,
954                        buffer.as_ref(),
955                        *page_index,
956                        is_dictionary_page,
957                    )?;
958                    let bytes = buffer.slice(offset..);
959                    let bytes =
960                        self.context
961                            .decrypt_page_data(bytes, *page_index, is_dictionary_page)?;
962
963                    if !is_dictionary_page {
964                        *page_index += 1;
965                    }
966                    decode_page(
967                        header,
968                        bytes,
969                        self.physical_type,
970                        self.decompressor.as_mut(),
971                    )?
972                }
973            };
974
975            return Ok(Some(page));
976        }
977    }
978
979    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
980        match &mut self.state {
981            SerializedPageReaderState::Values {
982                offset,
983                remaining_bytes,
984                next_page_header,
985                page_index,
986                require_dictionary,
987            } => {
988                loop {
989                    if *remaining_bytes == 0 {
990                        return Ok(None);
991                    }
992                    return if let Some(header) = next_page_header.as_ref() {
993                        if let Ok(page_meta) = (&**header).try_into() {
994                            Ok(Some(page_meta))
995                        } else {
996                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
997                            *next_page_header = None;
998                            continue;
999                        }
1000                    } else {
1001                        let mut read = self.reader.get_read(*offset)?;
1002                        let (header_len, header) = Self::read_page_header_len(
1003                            &self.context,
1004                            &mut read,
1005                            *page_index,
1006                            *require_dictionary,
1007                        )?;
1008                        verify_page_header_len(header_len, *remaining_bytes)?;
1009                        *offset += header_len as u64;
1010                        *remaining_bytes -= header_len as u64;
1011                        let page_meta = if let Ok(page_meta) = (&header).try_into() {
1012                            Ok(Some(page_meta))
1013                        } else {
1014                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
1015                            continue;
1016                        };
1017                        *next_page_header = Some(Box::new(header));
1018                        page_meta
1019                    };
1020                }
1021            }
1022            SerializedPageReaderState::Pages {
1023                page_locations,
1024                dictionary_page,
1025                total_rows,
1026                page_index: _,
1027            } => {
1028                if dictionary_page.is_some() {
1029                    Ok(Some(PageMetadata {
1030                        num_rows: None,
1031                        num_levels: None,
1032                        is_dict: true,
1033                    }))
1034                } else if let Some(page) = page_locations.front() {
1035                    let next_rows = page_locations
1036                        .get(1)
1037                        .map(|x| x.first_row_index as usize)
1038                        .unwrap_or(*total_rows);
1039
1040                    Ok(Some(PageMetadata {
1041                        num_rows: Some(next_rows - page.first_row_index as usize),
1042                        num_levels: None,
1043                        is_dict: false,
1044                    }))
1045                } else {
1046                    Ok(None)
1047                }
1048            }
1049        }
1050    }
1051
1052    fn skip_next_page(&mut self) -> Result<()> {
1053        match &mut self.state {
1054            SerializedPageReaderState::Values {
1055                offset,
1056                remaining_bytes,
1057                next_page_header,
1058                page_index,
1059                require_dictionary,
1060            } => {
1061                if let Some(buffered_header) = next_page_header.take() {
1062                    verify_page_size(
1063                        buffered_header.compressed_page_size,
1064                        buffered_header.uncompressed_page_size,
1065                        *remaining_bytes,
1066                    )?;
1067                    // The next page header has already been peeked, so just advance the offset
1068                    *offset += buffered_header.compressed_page_size as u64;
1069                    *remaining_bytes -= buffered_header.compressed_page_size as u64;
1070                } else {
1071                    let mut read = self.reader.get_read(*offset)?;
1072                    let (header_len, header) = Self::read_page_header_len(
1073                        &self.context,
1074                        &mut read,
1075                        *page_index,
1076                        *require_dictionary,
1077                    )?;
1078                    verify_page_header_len(header_len, *remaining_bytes)?;
1079                    verify_page_size(
1080                        header.compressed_page_size,
1081                        header.uncompressed_page_size,
1082                        *remaining_bytes,
1083                    )?;
1084                    let data_page_size = header.compressed_page_size as u64;
1085                    *offset += header_len as u64 + data_page_size;
1086                    *remaining_bytes -= header_len as u64 + data_page_size;
1087                }
1088                if *require_dictionary {
1089                    *require_dictionary = false;
1090                } else {
1091                    *page_index += 1;
1092                }
1093                Ok(())
1094            }
1095            SerializedPageReaderState::Pages {
1096                page_locations,
1097                dictionary_page,
1098                page_index,
1099                ..
1100            } => {
1101                if dictionary_page.is_some() {
1102                    // If a dictionary page exists, consume it by taking it (sets to None)
1103                    dictionary_page.take();
1104                } else {
1105                    // If no dictionary page exists, simply pop the data page from page_locations
1106                    if page_locations.pop_front().is_some() {
1107                        *page_index += 1;
1108                    }
1109                }
1110
1111                Ok(())
1112            }
1113        }
1114    }
1115
1116    fn at_record_boundary(&mut self) -> Result<bool> {
1117        match &mut self.state {
1118            SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
1119            SerializedPageReaderState::Pages { .. } => Ok(true),
1120        }
1121    }
1122}
1123
1124#[cfg(test)]
1125mod tests {
1126    use std::collections::HashSet;
1127
1128    use bytes::Buf;
1129
1130    use crate::file::page_index::column_index::{
1131        ByteArrayColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex,
1132    };
1133    use crate::file::properties::{EnabledStatistics, WriterProperties};
1134
1135    use crate::basic::{self, BoundaryOrder, ColumnOrder, Encoding, SortOrder};
1136    use crate::column::reader::ColumnReader;
1137    use crate::data_type::private::ParquetValueType;
1138    use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1139    use crate::file::metadata::thrift::DataPageHeaderV2;
1140    #[allow(deprecated)]
1141    use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
1142    use crate::file::writer::SerializedFileWriter;
1143    use crate::record::RowAccessor;
1144    use crate::schema::parser::parse_message_type;
1145    use crate::util::test_common::file_util::{get_test_file, get_test_path};
1146
1147    use super::*;
1148
1149    #[test]
1150    fn test_decode_page_invalid_offset() {
1151        let page_header = PageHeader {
1152            r#type: PageType::DATA_PAGE_V2,
1153            uncompressed_page_size: 10,
1154            compressed_page_size: 10,
1155            data_page_header: None,
1156            index_page_header: None,
1157            dictionary_page_header: None,
1158            crc: None,
1159            data_page_header_v2: Some(DataPageHeaderV2 {
1160                num_nulls: 0,
1161                num_rows: 0,
1162                num_values: 0,
1163                encoding: Encoding::PLAIN,
1164                definition_levels_byte_length: 11,
1165                repetition_levels_byte_length: 0,
1166                is_compressed: None,
1167                statistics: None,
1168            }),
1169        };
1170
1171        let buffer = Bytes::new();
1172        let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1173        assert!(
1174            err.to_string()
1175                .contains("DataPage v2 header contains implausible values")
1176        );
1177    }
1178
1179    #[test]
1180    fn test_decode_unsupported_page() {
1181        let mut page_header = PageHeader {
1182            r#type: PageType::INDEX_PAGE,
1183            uncompressed_page_size: 10,
1184            compressed_page_size: 10,
1185            data_page_header: None,
1186            index_page_header: None,
1187            dictionary_page_header: None,
1188            crc: None,
1189            data_page_header_v2: None,
1190        };
1191        let buffer = Bytes::new();
1192        let err = decode_page(page_header.clone(), buffer.clone(), Type::INT32, None).unwrap_err();
1193        assert_eq!(
1194            err.to_string(),
1195            "Parquet error: Page type INDEX_PAGE is not supported"
1196        );
1197
1198        page_header.data_page_header_v2 = Some(DataPageHeaderV2 {
1199            num_nulls: 0,
1200            num_rows: 0,
1201            num_values: 0,
1202            encoding: Encoding::PLAIN,
1203            definition_levels_byte_length: 11,
1204            repetition_levels_byte_length: 0,
1205            is_compressed: None,
1206            statistics: None,
1207        });
1208        let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1209        assert!(
1210            err.to_string()
1211                .contains("DataPage v2 header contains implausible values")
1212        );
1213    }
1214
1215    #[test]
1216    fn test_cursor_and_file_has_the_same_behaviour() {
1217        let mut buf: Vec<u8> = Vec::new();
1218        get_test_file("alltypes_plain.parquet")
1219            .read_to_end(&mut buf)
1220            .unwrap();
1221        let cursor = Bytes::from(buf);
1222        let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1223
1224        let test_file = get_test_file("alltypes_plain.parquet");
1225        let read_from_file = SerializedFileReader::new(test_file).unwrap();
1226
1227        let file_iter = read_from_file.get_row_iter(None).unwrap();
1228        let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1229
1230        for (a, b) in file_iter.zip(cursor_iter) {
1231            assert_eq!(a.unwrap(), b.unwrap())
1232        }
1233    }
1234
1235    #[test]
1236    fn test_file_reader_try_from() {
1237        // Valid file path
1238        let test_file = get_test_file("alltypes_plain.parquet");
1239        let test_path_buf = get_test_path("alltypes_plain.parquet");
1240        let test_path = test_path_buf.as_path();
1241        let test_path_str = test_path.to_str().unwrap();
1242
1243        let reader = SerializedFileReader::try_from(test_file);
1244        assert!(reader.is_ok());
1245
1246        let reader = SerializedFileReader::try_from(test_path);
1247        assert!(reader.is_ok());
1248
1249        let reader = SerializedFileReader::try_from(test_path_str);
1250        assert!(reader.is_ok());
1251
1252        let reader = SerializedFileReader::try_from(test_path_str.to_string());
1253        assert!(reader.is_ok());
1254
1255        // Invalid file path
1256        let test_path = Path::new("invalid.parquet");
1257        let test_path_str = test_path.to_str().unwrap();
1258
1259        let reader = SerializedFileReader::try_from(test_path);
1260        assert!(reader.is_err());
1261
1262        let reader = SerializedFileReader::try_from(test_path_str);
1263        assert!(reader.is_err());
1264
1265        let reader = SerializedFileReader::try_from(test_path_str.to_string());
1266        assert!(reader.is_err());
1267    }
1268
1269    #[test]
1270    fn test_file_reader_into_iter() {
1271        let path = get_test_path("alltypes_plain.parquet");
1272        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1273        let iter = reader.into_iter();
1274        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1275
1276        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1277    }
1278
1279    #[test]
1280    fn test_file_reader_into_iter_project() {
1281        let path = get_test_path("alltypes_plain.parquet");
1282        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1283        let schema = "message schema { OPTIONAL INT32 id; }";
1284        let proj = parse_message_type(schema).ok();
1285        let iter = reader.into_iter().project(proj).unwrap();
1286        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1287
1288        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1289    }
1290
1291    #[test]
1292    fn test_reuse_file_chunk() {
1293        // This test covers the case of maintaining the correct start position in a file
1294        // stream for each column reader after initializing and moving to the next one
1295        // (without necessarily reading the entire column).
1296        let test_file = get_test_file("alltypes_plain.parquet");
1297        let reader = SerializedFileReader::new(test_file).unwrap();
1298        let row_group = reader.get_row_group(0).unwrap();
1299
1300        let mut page_readers = Vec::new();
1301        for i in 0..row_group.num_columns() {
1302            page_readers.push(row_group.get_column_page_reader(i).unwrap());
1303        }
1304
1305        // Now buffer each col reader, we do not expect any failures like:
1306        // General("underlying Thrift error: end of file")
1307        for mut page_reader in page_readers {
1308            assert!(page_reader.get_next_page().is_ok());
1309        }
1310    }
1311
1312    #[test]
1313    fn test_file_reader() {
1314        let test_file = get_test_file("alltypes_plain.parquet");
1315        let reader_result = SerializedFileReader::new(test_file);
1316        assert!(reader_result.is_ok());
1317        let reader = reader_result.unwrap();
1318
1319        // Test contents in Parquet metadata
1320        let metadata = reader.metadata();
1321        assert_eq!(metadata.num_row_groups(), 1);
1322
1323        // Test contents in file metadata
1324        let file_metadata = metadata.file_metadata();
1325        assert!(file_metadata.created_by().is_some());
1326        assert_eq!(
1327            file_metadata.created_by().unwrap(),
1328            "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1329        );
1330        assert!(file_metadata.key_value_metadata().is_none());
1331        assert_eq!(file_metadata.num_rows(), 8);
1332        assert_eq!(file_metadata.version(), 1);
1333        assert_eq!(file_metadata.column_orders(), None);
1334
1335        // Test contents in row group metadata
1336        let row_group_metadata = metadata.row_group(0);
1337        assert_eq!(row_group_metadata.num_columns(), 11);
1338        assert_eq!(row_group_metadata.num_rows(), 8);
1339        assert_eq!(row_group_metadata.total_byte_size(), 671);
1340        // Check each column order
1341        for i in 0..row_group_metadata.num_columns() {
1342            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1343        }
1344
1345        // Test row group reader
1346        let row_group_reader_result = reader.get_row_group(0);
1347        assert!(row_group_reader_result.is_ok());
1348        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1349        assert_eq!(
1350            row_group_reader.num_columns(),
1351            row_group_metadata.num_columns()
1352        );
1353        assert_eq!(
1354            row_group_reader.metadata().total_byte_size(),
1355            row_group_metadata.total_byte_size()
1356        );
1357
1358        // Test page readers
1359        // TODO: test for every column
1360        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1361        assert!(page_reader_0_result.is_ok());
1362        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1363        let mut page_count = 0;
1364        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1365            let is_expected_page = match page {
1366                Page::DictionaryPage {
1367                    buf,
1368                    num_values,
1369                    encoding,
1370                    is_sorted,
1371                } => {
1372                    assert_eq!(buf.len(), 32);
1373                    assert_eq!(num_values, 8);
1374                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1375                    assert!(!is_sorted);
1376                    true
1377                }
1378                Page::DataPage {
1379                    buf,
1380                    num_values,
1381                    encoding,
1382                    def_level_encoding,
1383                    rep_level_encoding,
1384                    statistics,
1385                } => {
1386                    assert_eq!(buf.len(), 11);
1387                    assert_eq!(num_values, 8);
1388                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1389                    assert_eq!(def_level_encoding, Encoding::RLE);
1390                    #[allow(deprecated)]
1391                    let expected_rep_level_encoding = Encoding::BIT_PACKED;
1392                    assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1393                    assert!(statistics.is_none());
1394                    true
1395                }
1396                _ => false,
1397            };
1398            assert!(is_expected_page);
1399            page_count += 1;
1400        }
1401        assert_eq!(page_count, 2);
1402    }
1403
1404    #[test]
1405    fn test_file_reader_datapage_v2() {
1406        let test_file = get_test_file("datapage_v2.snappy.parquet");
1407        let reader_result = SerializedFileReader::new(test_file);
1408        assert!(reader_result.is_ok());
1409        let reader = reader_result.unwrap();
1410
1411        // Test contents in Parquet metadata
1412        let metadata = reader.metadata();
1413        assert_eq!(metadata.num_row_groups(), 1);
1414
1415        // Test contents in file metadata
1416        let file_metadata = metadata.file_metadata();
1417        assert!(file_metadata.created_by().is_some());
1418        assert_eq!(
1419            file_metadata.created_by().unwrap(),
1420            "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1421        );
1422        assert!(file_metadata.key_value_metadata().is_some());
1423        assert_eq!(
1424            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1425            1
1426        );
1427
1428        assert_eq!(file_metadata.num_rows(), 5);
1429        assert_eq!(file_metadata.version(), 1);
1430        assert_eq!(file_metadata.column_orders(), None);
1431
1432        let row_group_metadata = metadata.row_group(0);
1433
1434        // Check each column order
1435        for i in 0..row_group_metadata.num_columns() {
1436            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1437        }
1438
1439        // Test row group reader
1440        let row_group_reader_result = reader.get_row_group(0);
1441        assert!(row_group_reader_result.is_ok());
1442        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1443        assert_eq!(
1444            row_group_reader.num_columns(),
1445            row_group_metadata.num_columns()
1446        );
1447        assert_eq!(
1448            row_group_reader.metadata().total_byte_size(),
1449            row_group_metadata.total_byte_size()
1450        );
1451
1452        // Test page readers
1453        // TODO: test for every column
1454        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1455        assert!(page_reader_0_result.is_ok());
1456        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1457        let mut page_count = 0;
1458        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1459            let is_expected_page = match page {
1460                Page::DictionaryPage {
1461                    buf,
1462                    num_values,
1463                    encoding,
1464                    is_sorted,
1465                } => {
1466                    assert_eq!(buf.len(), 7);
1467                    assert_eq!(num_values, 1);
1468                    assert_eq!(encoding, Encoding::PLAIN);
1469                    assert!(!is_sorted);
1470                    true
1471                }
1472                Page::DataPageV2 {
1473                    buf,
1474                    num_values,
1475                    encoding,
1476                    num_nulls,
1477                    num_rows,
1478                    def_levels_byte_len,
1479                    rep_levels_byte_len,
1480                    is_compressed,
1481                    statistics,
1482                } => {
1483                    assert_eq!(buf.len(), 4);
1484                    assert_eq!(num_values, 5);
1485                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1486                    assert_eq!(num_nulls, 1);
1487                    assert_eq!(num_rows, 5);
1488                    assert_eq!(def_levels_byte_len, 2);
1489                    assert_eq!(rep_levels_byte_len, 0);
1490                    assert!(is_compressed);
1491                    assert!(statistics.is_none()); // page stats are no longer read
1492                    true
1493                }
1494                _ => false,
1495            };
1496            assert!(is_expected_page);
1497            page_count += 1;
1498        }
1499        assert_eq!(page_count, 2);
1500    }
1501
1502    #[test]
1503    fn test_file_reader_empty_compressed_datapage_v2() {
1504        // this file has a compressed datapage that un-compresses to 0 bytes
1505        let test_file = get_test_file("page_v2_empty_compressed.parquet");
1506        let reader_result = SerializedFileReader::new(test_file);
1507        assert!(reader_result.is_ok());
1508        let reader = reader_result.unwrap();
1509
1510        // Test contents in Parquet metadata
1511        let metadata = reader.metadata();
1512        assert_eq!(metadata.num_row_groups(), 1);
1513
1514        // Test contents in file metadata
1515        let file_metadata = metadata.file_metadata();
1516        assert!(file_metadata.created_by().is_some());
1517        assert_eq!(
1518            file_metadata.created_by().unwrap(),
1519            "parquet-cpp-arrow version 14.0.2"
1520        );
1521        assert!(file_metadata.key_value_metadata().is_some());
1522        assert_eq!(
1523            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1524            1
1525        );
1526
1527        assert_eq!(file_metadata.num_rows(), 10);
1528        assert_eq!(file_metadata.version(), 2);
1529        let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1530        assert_eq!(
1531            file_metadata.column_orders(),
1532            Some(vec![expected_order].as_ref())
1533        );
1534
1535        let row_group_metadata = metadata.row_group(0);
1536
1537        // Check each column order
1538        for i in 0..row_group_metadata.num_columns() {
1539            assert_eq!(file_metadata.column_order(i), expected_order);
1540        }
1541
1542        // Test row group reader
1543        let row_group_reader_result = reader.get_row_group(0);
1544        assert!(row_group_reader_result.is_ok());
1545        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1546        assert_eq!(
1547            row_group_reader.num_columns(),
1548            row_group_metadata.num_columns()
1549        );
1550        assert_eq!(
1551            row_group_reader.metadata().total_byte_size(),
1552            row_group_metadata.total_byte_size()
1553        );
1554
1555        // Test page readers
1556        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1557        assert!(page_reader_0_result.is_ok());
1558        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1559        let mut page_count = 0;
1560        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1561            let is_expected_page = match page {
1562                Page::DictionaryPage {
1563                    buf,
1564                    num_values,
1565                    encoding,
1566                    is_sorted,
1567                } => {
1568                    assert_eq!(buf.len(), 0);
1569                    assert_eq!(num_values, 0);
1570                    assert_eq!(encoding, Encoding::PLAIN);
1571                    assert!(!is_sorted);
1572                    true
1573                }
1574                Page::DataPageV2 {
1575                    buf,
1576                    num_values,
1577                    encoding,
1578                    num_nulls,
1579                    num_rows,
1580                    def_levels_byte_len,
1581                    rep_levels_byte_len,
1582                    is_compressed,
1583                    statistics,
1584                } => {
1585                    assert_eq!(buf.len(), 3);
1586                    assert_eq!(num_values, 10);
1587                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1588                    assert_eq!(num_nulls, 10);
1589                    assert_eq!(num_rows, 10);
1590                    assert_eq!(def_levels_byte_len, 2);
1591                    assert_eq!(rep_levels_byte_len, 0);
1592                    assert!(is_compressed);
1593                    assert!(statistics.is_none()); // page stats are no longer read
1594                    true
1595                }
1596                _ => false,
1597            };
1598            assert!(is_expected_page);
1599            page_count += 1;
1600        }
1601        assert_eq!(page_count, 2);
1602    }
1603
1604    #[test]
1605    fn test_file_reader_empty_datapage_v2() {
1606        // this file has 0 bytes compressed datapage that un-compresses to 0 bytes
1607        let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1608        let reader_result = SerializedFileReader::new(test_file);
1609        assert!(reader_result.is_ok());
1610        let reader = reader_result.unwrap();
1611
1612        // Test contents in Parquet metadata
1613        let metadata = reader.metadata();
1614        assert_eq!(metadata.num_row_groups(), 1);
1615
1616        // Test contents in file metadata
1617        let file_metadata = metadata.file_metadata();
1618        assert!(file_metadata.created_by().is_some());
1619        assert_eq!(
1620            file_metadata.created_by().unwrap(),
1621            "parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1622        );
1623        assert!(file_metadata.key_value_metadata().is_some());
1624        assert_eq!(
1625            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1626            2
1627        );
1628
1629        assert_eq!(file_metadata.num_rows(), 1);
1630        assert_eq!(file_metadata.version(), 1);
1631        let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1632        assert_eq!(
1633            file_metadata.column_orders(),
1634            Some(vec![expected_order].as_ref())
1635        );
1636
1637        let row_group_metadata = metadata.row_group(0);
1638
1639        // Check each column order
1640        for i in 0..row_group_metadata.num_columns() {
1641            assert_eq!(file_metadata.column_order(i), expected_order);
1642        }
1643
1644        // Test row group reader
1645        let row_group_reader_result = reader.get_row_group(0);
1646        assert!(row_group_reader_result.is_ok());
1647        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1648        assert_eq!(
1649            row_group_reader.num_columns(),
1650            row_group_metadata.num_columns()
1651        );
1652        assert_eq!(
1653            row_group_reader.metadata().total_byte_size(),
1654            row_group_metadata.total_byte_size()
1655        );
1656
1657        // Test page readers
1658        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1659        assert!(page_reader_0_result.is_ok());
1660        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1661        let mut page_count = 0;
1662        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1663            let is_expected_page = match page {
1664                Page::DataPageV2 {
1665                    buf,
1666                    num_values,
1667                    encoding,
1668                    num_nulls,
1669                    num_rows,
1670                    def_levels_byte_len,
1671                    rep_levels_byte_len,
1672                    is_compressed,
1673                    statistics,
1674                } => {
1675                    assert_eq!(buf.len(), 2);
1676                    assert_eq!(num_values, 1);
1677                    assert_eq!(encoding, Encoding::PLAIN);
1678                    assert_eq!(num_nulls, 1);
1679                    assert_eq!(num_rows, 1);
1680                    assert_eq!(def_levels_byte_len, 2);
1681                    assert_eq!(rep_levels_byte_len, 0);
1682                    assert!(is_compressed);
1683                    assert!(statistics.is_none());
1684                    true
1685                }
1686                _ => false,
1687            };
1688            assert!(is_expected_page);
1689            page_count += 1;
1690        }
1691        assert_eq!(page_count, 1);
1692    }
1693
1694    fn get_serialized_page_reader<R: ChunkReader>(
1695        file_reader: &SerializedFileReader<R>,
1696        row_group: usize,
1697        column: usize,
1698    ) -> Result<SerializedPageReader<R>> {
1699        let row_group = {
1700            let row_group_metadata = file_reader.metadata.row_group(row_group);
1701            let props = Arc::clone(&file_reader.props);
1702            let f = Arc::clone(&file_reader.chunk_reader);
1703            SerializedRowGroupReader::new(
1704                f,
1705                row_group_metadata,
1706                file_reader
1707                    .metadata
1708                    .offset_index()
1709                    .map(|x| x[row_group].as_slice()),
1710                props,
1711            )?
1712        };
1713
1714        let col = row_group.metadata.column(column);
1715
1716        let page_locations = row_group
1717            .offset_index
1718            .map(|x| x[column].page_locations.clone());
1719
1720        let props = Arc::clone(&row_group.props);
1721        SerializedPageReader::new_with_properties(
1722            Arc::clone(&row_group.chunk_reader),
1723            col,
1724            usize::try_from(row_group.metadata.num_rows())?,
1725            page_locations,
1726            props,
1727        )
1728    }
1729
1730    #[test]
1731    fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1732        let test_file = get_test_file("alltypes_plain.parquet");
1733        let reader = SerializedFileReader::new(test_file)?;
1734
1735        let mut offset_set = HashSet::new();
1736        let num_row_groups = reader.metadata.num_row_groups();
1737        for row_group in 0..num_row_groups {
1738            let num_columns = reader.metadata.row_group(row_group).num_columns();
1739            for column in 0..num_columns {
1740                let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1741
1742                while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1743                    match &page_reader.state {
1744                        SerializedPageReaderState::Pages {
1745                            page_locations,
1746                            dictionary_page,
1747                            ..
1748                        } => {
1749                            if let Some(page) = dictionary_page {
1750                                assert_eq!(page.offset as u64, page_offset);
1751                            } else if let Some(page) = page_locations.front() {
1752                                assert_eq!(page.offset as u64, page_offset);
1753                            } else {
1754                                unreachable!()
1755                            }
1756                        }
1757                        SerializedPageReaderState::Values {
1758                            offset,
1759                            next_page_header,
1760                            ..
1761                        } => {
1762                            assert!(next_page_header.is_some());
1763                            assert_eq!(*offset, page_offset);
1764                        }
1765                    }
1766                    let page = page_reader.get_next_page()?;
1767                    assert!(page.is_some());
1768                    let newly_inserted = offset_set.insert(page_offset);
1769                    assert!(newly_inserted);
1770                }
1771            }
1772        }
1773
1774        Ok(())
1775    }
1776
1777    #[test]
1778    fn test_page_iterator() {
1779        let file = get_test_file("alltypes_plain.parquet");
1780        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1781
1782        let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1783
1784        // read first page
1785        let page = page_iterator.next();
1786        assert!(page.is_some());
1787        assert!(page.unwrap().is_ok());
1788
1789        // reach end of file
1790        let page = page_iterator.next();
1791        assert!(page.is_none());
1792
1793        let row_group_indices = Box::new(0..1);
1794        let mut page_iterator =
1795            FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1796
1797        // read first page
1798        let page = page_iterator.next();
1799        assert!(page.is_some());
1800        assert!(page.unwrap().is_ok());
1801
1802        // reach end of file
1803        let page = page_iterator.next();
1804        assert!(page.is_none());
1805    }
1806
1807    #[test]
1808    fn test_file_reader_key_value_metadata() {
1809        let file = get_test_file("binary.parquet");
1810        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1811
1812        let metadata = file_reader
1813            .metadata
1814            .file_metadata()
1815            .key_value_metadata()
1816            .unwrap();
1817
1818        assert_eq!(metadata.len(), 3);
1819
1820        assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1821
1822        assert_eq!(metadata[1].key, "writer.model.name");
1823        assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1824
1825        assert_eq!(metadata[2].key, "parquet.proto.class");
1826        assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1827    }
1828
1829    #[test]
1830    fn test_file_reader_optional_metadata() {
1831        // file with optional metadata: bloom filters, encoding stats, column index and offset index.
1832        let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1833        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1834
1835        let row_group_metadata = file_reader.metadata.row_group(0);
1836        let col0_metadata = row_group_metadata.column(0);
1837
1838        // test optional bloom filter offset
1839        assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1840
1841        // test page encoding stats
1842        let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1843
1844        assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1845        assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1846        assert_eq!(page_encoding_stats.count, 1);
1847
1848        // test optional column index offset
1849        assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1850        assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1851
1852        // test optional offset index offset
1853        assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1854        assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1855    }
1856
1857    #[test]
1858    fn test_file_reader_with_no_filter() -> Result<()> {
1859        let test_file = get_test_file("alltypes_plain.parquet");
1860        let origin_reader = SerializedFileReader::new(test_file)?;
1861        // test initial number of row groups
1862        let metadata = origin_reader.metadata();
1863        assert_eq!(metadata.num_row_groups(), 1);
1864        Ok(())
1865    }
1866
1867    #[test]
1868    fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
1869        let test_file = get_test_file("alltypes_plain.parquet");
1870        let read_options = ReadOptionsBuilder::new()
1871            .with_predicate(Box::new(|_, _| false))
1872            .build();
1873        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1874        let metadata = reader.metadata();
1875        assert_eq!(metadata.num_row_groups(), 0);
1876        Ok(())
1877    }
1878
1879    #[test]
1880    fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
1881        let test_file = get_test_file("alltypes_plain.parquet");
1882        let origin_reader = SerializedFileReader::new(test_file)?;
1883        // test initial number of row groups
1884        let metadata = origin_reader.metadata();
1885        assert_eq!(metadata.num_row_groups(), 1);
1886        let mid = get_midpoint_offset(metadata.row_group(0));
1887
1888        let test_file = get_test_file("alltypes_plain.parquet");
1889        let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
1890        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1891        let metadata = reader.metadata();
1892        assert_eq!(metadata.num_row_groups(), 1);
1893
1894        let test_file = get_test_file("alltypes_plain.parquet");
1895        let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
1896        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1897        let metadata = reader.metadata();
1898        assert_eq!(metadata.num_row_groups(), 0);
1899        Ok(())
1900    }
1901
1902    #[test]
1903    fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
1904        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1905        let origin_reader = SerializedFileReader::new(test_file)?;
1906        let metadata = origin_reader.metadata();
1907        let mid = get_midpoint_offset(metadata.row_group(0));
1908
1909        // true, true predicate
1910        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1911        let read_options = ReadOptionsBuilder::new()
1912            .with_page_index()
1913            .with_predicate(Box::new(|_, _| true))
1914            .with_range(mid, mid + 1)
1915            .build();
1916        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1917        let metadata = reader.metadata();
1918        assert_eq!(metadata.num_row_groups(), 1);
1919        assert_eq!(metadata.column_index().unwrap().len(), 1);
1920        assert_eq!(metadata.offset_index().unwrap().len(), 1);
1921
1922        // true, false predicate
1923        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1924        let read_options = ReadOptionsBuilder::new()
1925            .with_page_index()
1926            .with_predicate(Box::new(|_, _| true))
1927            .with_range(0, mid)
1928            .build();
1929        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1930        let metadata = reader.metadata();
1931        assert_eq!(metadata.num_row_groups(), 0);
1932        assert!(metadata.column_index().is_none());
1933        assert!(metadata.offset_index().is_none());
1934
1935        // false, true predicate
1936        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1937        let read_options = ReadOptionsBuilder::new()
1938            .with_page_index()
1939            .with_predicate(Box::new(|_, _| false))
1940            .with_range(mid, mid + 1)
1941            .build();
1942        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1943        let metadata = reader.metadata();
1944        assert_eq!(metadata.num_row_groups(), 0);
1945        assert!(metadata.column_index().is_none());
1946        assert!(metadata.offset_index().is_none());
1947
1948        // false, false predicate
1949        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1950        let read_options = ReadOptionsBuilder::new()
1951            .with_page_index()
1952            .with_predicate(Box::new(|_, _| false))
1953            .with_range(0, mid)
1954            .build();
1955        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1956        let metadata = reader.metadata();
1957        assert_eq!(metadata.num_row_groups(), 0);
1958        assert!(metadata.column_index().is_none());
1959        assert!(metadata.offset_index().is_none());
1960        Ok(())
1961    }
1962
1963    #[test]
1964    fn test_file_reader_invalid_metadata() {
1965        let data = [
1966            255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
1967            80, 65, 82, 49,
1968        ];
1969        let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
1970        assert_eq!(
1971            ret.err().unwrap().to_string(),
1972            "Parquet error: Received empty union from remote ColumnOrder"
1973        );
1974    }
1975
1976    #[test]
1977    // Use java parquet-tools get below pageIndex info
1978    // !```
1979    // parquet-tools column-index ./data_index_bloom_encoding_stats.parquet
1980    // row group 0:
1981    // column index for column String:
1982    // Boundary order: ASCENDING
1983    // page-0  :
1984    // null count                 min                                  max
1985    // 0                          Hello                                today
1986    //
1987    // offset index for column String:
1988    // page-0   :
1989    // offset   compressed size       first row index
1990    // 4               152                     0
1991    ///```
1992    //
1993    fn test_page_index_reader() {
1994        let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
1995        let builder = ReadOptionsBuilder::new();
1996        //enable read page index
1997        let options = builder.with_page_index().build();
1998        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1999        let reader = reader_result.unwrap();
2000
2001        // Test contents in Parquet metadata
2002        let metadata = reader.metadata();
2003        assert_eq!(metadata.num_row_groups(), 1);
2004
2005        let column_index = metadata.column_index().unwrap();
2006
2007        // only one row group
2008        assert_eq!(column_index.len(), 1);
2009        let index = if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][0] {
2010            index
2011        } else {
2012            unreachable!()
2013        };
2014
2015        assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
2016
2017        //only one page group
2018        assert_eq!(index.num_pages(), 1);
2019
2020        let min = index.min_value(0).unwrap();
2021        let max = index.max_value(0).unwrap();
2022        assert_eq!(b"Hello", min.as_bytes());
2023        assert_eq!(b"today", max.as_bytes());
2024
2025        let offset_indexes = metadata.offset_index().unwrap();
2026        // only one row group
2027        assert_eq!(offset_indexes.len(), 1);
2028        let offset_index = &offset_indexes[0];
2029        let page_offset = &offset_index[0].page_locations()[0];
2030
2031        assert_eq!(4, page_offset.offset);
2032        assert_eq!(152, page_offset.compressed_page_size);
2033        assert_eq!(0, page_offset.first_row_index);
2034    }
2035
2036    #[test]
2037    #[allow(deprecated)]
2038    fn test_page_index_reader_out_of_order() {
2039        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2040        let options = ReadOptionsBuilder::new().with_page_index().build();
2041        let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
2042        let metadata = reader.metadata();
2043
2044        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2045        let columns = metadata.row_group(0).columns();
2046        let reversed: Vec<_> = columns.iter().cloned().rev().collect();
2047
2048        let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
2049        let mut b = read_columns_indexes(&test_file, &reversed)
2050            .unwrap()
2051            .unwrap();
2052        b.reverse();
2053        assert_eq!(a, b);
2054
2055        let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
2056        let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
2057        b.reverse();
2058        assert_eq!(a, b);
2059    }
2060
2061    #[test]
2062    fn test_page_index_reader_all_type() {
2063        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2064        let builder = ReadOptionsBuilder::new();
2065        //enable read page index
2066        let options = builder.with_page_index().build();
2067        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2068        let reader = reader_result.unwrap();
2069
2070        // Test contents in Parquet metadata
2071        let metadata = reader.metadata();
2072        assert_eq!(metadata.num_row_groups(), 1);
2073
2074        let column_index = metadata.column_index().unwrap();
2075        let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
2076
2077        // only one row group
2078        assert_eq!(column_index.len(), 1);
2079        let row_group_metadata = metadata.row_group(0);
2080
2081        //col0->id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 7299, num_nulls: 0]
2082        assert!(!&column_index[0][0].is_sorted());
2083        let boundary_order = &column_index[0][0].get_boundary_order();
2084        assert!(boundary_order.is_some());
2085        matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
2086        if let ColumnIndexMetaData::INT32(index) = &column_index[0][0] {
2087            check_native_page_index(
2088                index,
2089                325,
2090                get_row_group_min_max_bytes(row_group_metadata, 0),
2091                BoundaryOrder::UNORDERED,
2092            );
2093            assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
2094        } else {
2095            unreachable!()
2096        };
2097        //col1->bool_col:BOOLEAN UNCOMPRESSED DO:0 FPO:37329 SZ:3022/3022/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: false, max: true, num_nulls: 0]
2098        assert!(&column_index[0][1].is_sorted());
2099        if let ColumnIndexMetaData::BOOLEAN(index) = &column_index[0][1] {
2100            assert_eq!(index.num_pages(), 82);
2101            assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
2102        } else {
2103            unreachable!()
2104        };
2105        //col2->tinyint_col: INT32 UNCOMPRESSED DO:0 FPO:40351 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2106        assert!(&column_index[0][2].is_sorted());
2107        if let ColumnIndexMetaData::INT32(index) = &column_index[0][2] {
2108            check_native_page_index(
2109                index,
2110                325,
2111                get_row_group_min_max_bytes(row_group_metadata, 2),
2112                BoundaryOrder::ASCENDING,
2113            );
2114            assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
2115        } else {
2116            unreachable!()
2117        };
2118        //col4->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2119        assert!(&column_index[0][3].is_sorted());
2120        if let ColumnIndexMetaData::INT32(index) = &column_index[0][3] {
2121            check_native_page_index(
2122                index,
2123                325,
2124                get_row_group_min_max_bytes(row_group_metadata, 3),
2125                BoundaryOrder::ASCENDING,
2126            );
2127            assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
2128        } else {
2129            unreachable!()
2130        };
2131        //col5->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2132        assert!(&column_index[0][4].is_sorted());
2133        if let ColumnIndexMetaData::INT32(index) = &column_index[0][4] {
2134            check_native_page_index(
2135                index,
2136                325,
2137                get_row_group_min_max_bytes(row_group_metadata, 4),
2138                BoundaryOrder::ASCENDING,
2139            );
2140            assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
2141        } else {
2142            unreachable!()
2143        };
2144        //col6->bigint_col: INT64 UNCOMPRESSED DO:0 FPO:152326 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 90, num_nulls: 0]
2145        assert!(!&column_index[0][5].is_sorted());
2146        if let ColumnIndexMetaData::INT64(index) = &column_index[0][5] {
2147            check_native_page_index(
2148                index,
2149                528,
2150                get_row_group_min_max_bytes(row_group_metadata, 5),
2151                BoundaryOrder::UNORDERED,
2152            );
2153            assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
2154        } else {
2155            unreachable!()
2156        };
2157        //col7->float_col: FLOAT UNCOMPRESSED DO:0 FPO:223924 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 9.9, num_nulls: 0]
2158        assert!(&column_index[0][6].is_sorted());
2159        if let ColumnIndexMetaData::FLOAT(index) = &column_index[0][6] {
2160            check_native_page_index(
2161                index,
2162                325,
2163                get_row_group_min_max_bytes(row_group_metadata, 6),
2164                BoundaryOrder::ASCENDING,
2165            );
2166            assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
2167        } else {
2168            unreachable!()
2169        };
2170        //col8->double_col: DOUBLE UNCOMPRESSED DO:0 FPO:261249 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 90.89999999999999, num_nulls: 0]
2171        assert!(!&column_index[0][7].is_sorted());
2172        if let ColumnIndexMetaData::DOUBLE(index) = &column_index[0][7] {
2173            check_native_page_index(
2174                index,
2175                528,
2176                get_row_group_min_max_bytes(row_group_metadata, 7),
2177                BoundaryOrder::UNORDERED,
2178            );
2179            assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
2180        } else {
2181            unreachable!()
2182        };
2183        //col9->date_string_col: BINARY UNCOMPRESSED DO:0 FPO:332847 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 01/01/09, max: 12/31/10, num_nulls: 0]
2184        assert!(!&column_index[0][8].is_sorted());
2185        if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][8] {
2186            check_byte_array_page_index(
2187                index,
2188                974,
2189                get_row_group_min_max_bytes(row_group_metadata, 8),
2190                BoundaryOrder::UNORDERED,
2191            );
2192            assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
2193        } else {
2194            unreachable!()
2195        };
2196        //col10->string_col: BINARY UNCOMPRESSED DO:0 FPO:444795 SZ:45298/45298/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2197        assert!(&column_index[0][9].is_sorted());
2198        if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][9] {
2199            check_byte_array_page_index(
2200                index,
2201                352,
2202                get_row_group_min_max_bytes(row_group_metadata, 9),
2203                BoundaryOrder::ASCENDING,
2204            );
2205            assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
2206        } else {
2207            unreachable!()
2208        };
2209        //col11->timestamp_col: INT96 UNCOMPRESSED DO:0 FPO:490093 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[num_nulls: 0, min/max not defined]
2210        //Notice: min_max values for each page for this col not exits.
2211        assert!(!&column_index[0][10].is_sorted());
2212        if let ColumnIndexMetaData::NONE = &column_index[0][10] {
2213            assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
2214        } else {
2215            unreachable!()
2216        };
2217        //col12->year: INT32 UNCOMPRESSED DO:0 FPO:602041 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2009, max: 2010, num_nulls: 0]
2218        assert!(&column_index[0][11].is_sorted());
2219        if let ColumnIndexMetaData::INT32(index) = &column_index[0][11] {
2220            check_native_page_index(
2221                index,
2222                325,
2223                get_row_group_min_max_bytes(row_group_metadata, 11),
2224                BoundaryOrder::ASCENDING,
2225            );
2226            assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
2227        } else {
2228            unreachable!()
2229        };
2230        //col13->month: INT32 UNCOMPRESSED DO:0 FPO:639366 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 12, num_nulls: 0]
2231        assert!(!&column_index[0][12].is_sorted());
2232        if let ColumnIndexMetaData::INT32(index) = &column_index[0][12] {
2233            check_native_page_index(
2234                index,
2235                325,
2236                get_row_group_min_max_bytes(row_group_metadata, 12),
2237                BoundaryOrder::UNORDERED,
2238            );
2239            assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
2240        } else {
2241            unreachable!()
2242        };
2243    }
2244
2245    fn check_native_page_index<T: ParquetValueType>(
2246        row_group_index: &PrimitiveColumnIndex<T>,
2247        page_size: usize,
2248        min_max: (&[u8], &[u8]),
2249        boundary_order: BoundaryOrder,
2250    ) {
2251        assert_eq!(row_group_index.num_pages() as usize, page_size);
2252        assert_eq!(row_group_index.boundary_order, boundary_order);
2253        assert!(row_group_index.min_values().iter().all(|x| {
2254            x >= &T::try_from_le_slice(min_max.0).unwrap()
2255                && x <= &T::try_from_le_slice(min_max.1).unwrap()
2256        }));
2257    }
2258
2259    fn check_byte_array_page_index(
2260        row_group_index: &ByteArrayColumnIndex,
2261        page_size: usize,
2262        min_max: (&[u8], &[u8]),
2263        boundary_order: BoundaryOrder,
2264    ) {
2265        assert_eq!(row_group_index.num_pages() as usize, page_size);
2266        assert_eq!(row_group_index.boundary_order, boundary_order);
2267        for i in 0..row_group_index.num_pages() as usize {
2268            let x = row_group_index.min_value(i).unwrap();
2269            assert!(x >= min_max.0 && x <= min_max.1);
2270        }
2271    }
2272
2273    fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
2274        let statistics = r.column(col_num).statistics().unwrap();
2275        (
2276            statistics.min_bytes_opt().unwrap_or_default(),
2277            statistics.max_bytes_opt().unwrap_or_default(),
2278        )
2279    }
2280
2281    #[test]
2282    fn test_skip_next_page_with_dictionary_page() {
2283        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2284        let builder = ReadOptionsBuilder::new();
2285        // enable read page index
2286        let options = builder.with_page_index().build();
2287        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2288        let reader = reader_result.unwrap();
2289
2290        let row_group_reader = reader.get_row_group(0).unwrap();
2291
2292        // use 'string_col', Boundary order: UNORDERED, total 352 data pages and 1 dictionary page.
2293        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2294
2295        let mut vec = vec![];
2296
2297        // Step 1: Peek and ensure dictionary page is correctly identified
2298        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2299        assert!(meta.is_dict);
2300
2301        // Step 2: Call skip_next_page to skip the dictionary page
2302        column_page_reader.skip_next_page().unwrap();
2303
2304        // Step 3: Read the next data page after skipping the dictionary page
2305        let page = column_page_reader.get_next_page().unwrap().unwrap();
2306        assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2307
2308        // Step 4: Continue reading remaining data pages and verify correctness
2309        for _i in 0..351 {
2310            // 352 total pages, 1 dictionary page is skipped
2311            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2312            assert!(!meta.is_dict); // Verify no dictionary page here
2313            vec.push(meta);
2314
2315            let page = column_page_reader.get_next_page().unwrap().unwrap();
2316            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2317        }
2318
2319        // Step 5: Check if all pages are read
2320        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2321        assert!(column_page_reader.get_next_page().unwrap().is_none());
2322
2323        // Step 6: Verify the number of data pages read (should be 351 data pages)
2324        assert_eq!(vec.len(), 351);
2325    }
2326
2327    #[test]
2328    fn test_skip_page_with_offset_index() {
2329        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2330        let builder = ReadOptionsBuilder::new();
2331        //enable read page index
2332        let options = builder.with_page_index().build();
2333        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2334        let reader = reader_result.unwrap();
2335
2336        let row_group_reader = reader.get_row_group(0).unwrap();
2337
2338        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
2339        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2340
2341        let mut vec = vec![];
2342
2343        for i in 0..325 {
2344            if i % 2 == 0 {
2345                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2346            } else {
2347                column_page_reader.skip_next_page().unwrap();
2348            }
2349        }
2350        //check read all pages.
2351        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2352        assert!(column_page_reader.get_next_page().unwrap().is_none());
2353
2354        assert_eq!(vec.len(), 163);
2355    }
2356
2357    #[test]
2358    fn test_skip_page_without_offset_index() {
2359        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2360
2361        // use default SerializedFileReader without read offsetIndex
2362        let reader_result = SerializedFileReader::new(test_file);
2363        let reader = reader_result.unwrap();
2364
2365        let row_group_reader = reader.get_row_group(0).unwrap();
2366
2367        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
2368        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2369
2370        let mut vec = vec![];
2371
2372        for i in 0..325 {
2373            if i % 2 == 0 {
2374                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2375            } else {
2376                column_page_reader.peek_next_page().unwrap().unwrap();
2377                column_page_reader.skip_next_page().unwrap();
2378            }
2379        }
2380        //check read all pages.
2381        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2382        assert!(column_page_reader.get_next_page().unwrap().is_none());
2383
2384        assert_eq!(vec.len(), 163);
2385    }
2386
2387    #[test]
2388    fn test_peek_page_with_dictionary_page() {
2389        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2390        let builder = ReadOptionsBuilder::new();
2391        //enable read page index
2392        let options = builder.with_page_index().build();
2393        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2394        let reader = reader_result.unwrap();
2395        let row_group_reader = reader.get_row_group(0).unwrap();
2396
2397        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
2398        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2399
2400        let mut vec = vec![];
2401
2402        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2403        assert!(meta.is_dict);
2404        let page = column_page_reader.get_next_page().unwrap().unwrap();
2405        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2406
2407        for i in 0..352 {
2408            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2409            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
2410            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
2411            if i != 351 {
2412                assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
2413            } else {
2414                // last page first row index is 7290, total row count is 7300
2415                // because first row start with zero, last page row count should be 10.
2416                assert_eq!(meta.num_rows, Some(10));
2417            }
2418            assert!(!meta.is_dict);
2419            vec.push(meta);
2420            let page = column_page_reader.get_next_page().unwrap().unwrap();
2421            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2422        }
2423
2424        //check read all pages.
2425        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2426        assert!(column_page_reader.get_next_page().unwrap().is_none());
2427
2428        assert_eq!(vec.len(), 352);
2429    }
2430
2431    #[test]
2432    fn test_peek_page_with_dictionary_page_without_offset_index() {
2433        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2434
2435        let reader_result = SerializedFileReader::new(test_file);
2436        let reader = reader_result.unwrap();
2437        let row_group_reader = reader.get_row_group(0).unwrap();
2438
2439        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
2440        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2441
2442        let mut vec = vec![];
2443
2444        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2445        assert!(meta.is_dict);
2446        let page = column_page_reader.get_next_page().unwrap().unwrap();
2447        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2448
2449        for i in 0..352 {
2450            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2451            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
2452            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
2453            if i != 351 {
2454                assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2455            } else {
2456                // last page first row index is 7290, total row count is 7300
2457                // because first row start with zero, last page row count should be 10.
2458                assert_eq!(meta.num_levels, Some(10));
2459            }
2460            assert!(!meta.is_dict);
2461            vec.push(meta);
2462            let page = column_page_reader.get_next_page().unwrap().unwrap();
2463            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2464        }
2465
2466        //check read all pages.
2467        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2468        assert!(column_page_reader.get_next_page().unwrap().is_none());
2469
2470        assert_eq!(vec.len(), 352);
2471    }
2472
2473    #[test]
2474    fn test_fixed_length_index() {
2475        let message_type = "
2476        message test_schema {
2477          OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2478        }
2479        ";
2480
2481        let schema = parse_message_type(message_type).unwrap();
2482        let mut out = Vec::with_capacity(1024);
2483        let mut writer =
2484            SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2485
2486        let mut r = writer.next_row_group().unwrap();
2487        let mut c = r.next_column().unwrap().unwrap();
2488        c.typed::<FixedLenByteArrayType>()
2489            .write_batch(
2490                &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2491                Some(&[1, 1, 0, 1]),
2492                None,
2493            )
2494            .unwrap();
2495        c.close().unwrap();
2496        r.close().unwrap();
2497        writer.close().unwrap();
2498
2499        let b = Bytes::from(out);
2500        let options = ReadOptionsBuilder::new().with_page_index().build();
2501        let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2502        let index = reader.metadata().column_index().unwrap();
2503
2504        // 1 row group
2505        assert_eq!(index.len(), 1);
2506        let c = &index[0];
2507        // 1 column
2508        assert_eq!(c.len(), 1);
2509
2510        match &c[0] {
2511            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(v) => {
2512                assert_eq!(v.num_pages(), 1);
2513                assert_eq!(v.null_count(0).unwrap(), 1);
2514                assert_eq!(v.min_value(0).unwrap(), &[0; 11]);
2515                assert_eq!(v.max_value(0).unwrap(), &[5; 11]);
2516            }
2517            _ => unreachable!(),
2518        }
2519    }
2520
2521    #[test]
2522    fn test_multi_gz() {
2523        let file = get_test_file("concatenated_gzip_members.parquet");
2524        let reader = SerializedFileReader::new(file).unwrap();
2525        let row_group_reader = reader.get_row_group(0).unwrap();
2526        match row_group_reader.get_column_reader(0).unwrap() {
2527            ColumnReader::Int64ColumnReader(mut reader) => {
2528                let mut buffer = Vec::with_capacity(1024);
2529                let mut def_levels = Vec::with_capacity(1024);
2530                let (num_records, num_values, num_levels) = reader
2531                    .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2532                    .unwrap();
2533
2534                assert_eq!(num_records, 513);
2535                assert_eq!(num_values, 513);
2536                assert_eq!(num_levels, 513);
2537
2538                let expected: Vec<i64> = (1..514).collect();
2539                assert_eq!(&buffer, &expected);
2540            }
2541            _ => unreachable!(),
2542        }
2543    }
2544
2545    #[test]
2546    fn test_byte_stream_split_extended() {
2547        let path = format!(
2548            "{}/byte_stream_split_extended.gzip.parquet",
2549            arrow::util::test_util::parquet_test_data(),
2550        );
2551        let file = File::open(path).unwrap();
2552        let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2553
2554        // Use full schema as projected schema
2555        let mut iter = reader
2556            .get_row_iter(None)
2557            .expect("Failed to create row iterator");
2558
2559        let mut start = 0;
2560        let end = reader.metadata().file_metadata().num_rows();
2561
2562        let check_row = |row: Result<Row, ParquetError>| {
2563            assert!(row.is_ok());
2564            let r = row.unwrap();
2565            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2566            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2567            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2568            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2569            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2570            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2571            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2572        };
2573
2574        while start < end {
2575            match iter.next() {
2576                Some(row) => check_row(row),
2577                None => break,
2578            };
2579            start += 1;
2580        }
2581    }
2582
2583    #[test]
2584    fn test_filtered_rowgroup_metadata() {
2585        let message_type = "
2586            message test_schema {
2587                REQUIRED INT32 a;
2588            }
2589        ";
2590        let schema = Arc::new(parse_message_type(message_type).unwrap());
2591        let props = Arc::new(
2592            WriterProperties::builder()
2593                .set_statistics_enabled(EnabledStatistics::Page)
2594                .build(),
2595        );
2596        let mut file: File = tempfile::tempfile().unwrap();
2597        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2598        let data = [1, 2, 3, 4, 5];
2599
2600        // write 5 row groups
2601        for idx in 0..5 {
2602            let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2603            let mut row_group_writer = file_writer.next_row_group().unwrap();
2604            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2605                writer
2606                    .typed::<Int32Type>()
2607                    .write_batch(data_i.as_slice(), None, None)
2608                    .unwrap();
2609                writer.close().unwrap();
2610            }
2611            row_group_writer.close().unwrap();
2612            file_writer.flushed_row_groups();
2613        }
2614        let file_metadata = file_writer.close().unwrap();
2615
2616        assert_eq!(file_metadata.file_metadata().num_rows(), 25);
2617        assert_eq!(file_metadata.num_row_groups(), 5);
2618
2619        // read only the 3rd row group
2620        let read_options = ReadOptionsBuilder::new()
2621            .with_page_index()
2622            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2623            .build();
2624        let reader =
2625            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2626                .unwrap();
2627        let metadata = reader.metadata();
2628
2629        // check we got the expected row group
2630        assert_eq!(metadata.num_row_groups(), 1);
2631        assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2632
2633        // check we only got the relevant page indexes
2634        assert!(metadata.column_index().is_some());
2635        assert!(metadata.offset_index().is_some());
2636        assert_eq!(metadata.column_index().unwrap().len(), 1);
2637        assert_eq!(metadata.offset_index().unwrap().len(), 1);
2638        let col_idx = metadata.column_index().unwrap();
2639        let off_idx = metadata.offset_index().unwrap();
2640        let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2641        let pg_idx = &col_idx[0][0];
2642        let off_idx_i = &off_idx[0][0];
2643
2644        // test that we got the index matching the row group
2645        match pg_idx {
2646            ColumnIndexMetaData::INT32(int_idx) => {
2647                let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2648                let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2649                assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2650                assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2651            }
2652            _ => panic!("wrong stats type"),
2653        }
2654
2655        // check offset index matches too
2656        assert_eq!(
2657            off_idx_i.page_locations[0].offset,
2658            metadata.row_group(0).column(0).data_page_offset()
2659        );
2660
2661        // read non-contiguous row groups
2662        let read_options = ReadOptionsBuilder::new()
2663            .with_page_index()
2664            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2665            .build();
2666        let reader =
2667            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2668                .unwrap();
2669        let metadata = reader.metadata();
2670
2671        // check we got the expected row groups
2672        assert_eq!(metadata.num_row_groups(), 2);
2673        assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2674        assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2675
2676        // check we only got the relevant page indexes
2677        assert!(metadata.column_index().is_some());
2678        assert!(metadata.offset_index().is_some());
2679        assert_eq!(metadata.column_index().unwrap().len(), 2);
2680        assert_eq!(metadata.offset_index().unwrap().len(), 2);
2681        let col_idx = metadata.column_index().unwrap();
2682        let off_idx = metadata.offset_index().unwrap();
2683
2684        for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2685            let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2686            let pg_idx = &col_idx_i[0];
2687            let off_idx_i = &off_idx[i][0];
2688
2689            // test that we got the index matching the row group
2690            match pg_idx {
2691                ColumnIndexMetaData::INT32(int_idx) => {
2692                    let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2693                    let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2694                    assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2695                    assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2696                }
2697                _ => panic!("wrong stats type"),
2698            }
2699
2700            // check offset index matches too
2701            assert_eq!(
2702                off_idx_i.page_locations[0].offset,
2703                metadata.row_group(i).column(0).data_page_offset()
2704            );
2705        }
2706    }
2707}