parquet/file/
serialized_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader
19//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM)
20
21use crate::basic::{PageType, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{Codec, create_codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{CryptoContext, read_and_decrypt};
27use crate::errors::{ParquetError, Result};
28use crate::file::metadata::thrift::PageHeader;
29use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
30use crate::file::statistics;
31use crate::file::{
32    metadata::*,
33    properties::{ReaderProperties, ReaderPropertiesPtr},
34    reader::*,
35};
36#[cfg(feature = "encryption")]
37use crate::parquet_thrift::ThriftSliceInputProtocol;
38use crate::parquet_thrift::{ReadThrift, ThriftReadInputProtocol};
39use crate::record::Row;
40use crate::record::reader::RowIter;
41use crate::schema::types::{SchemaDescPtr, Type as SchemaType};
42use bytes::Bytes;
43use std::collections::VecDeque;
44use std::{fs::File, io::Read, path::Path, sync::Arc};
45
46impl TryFrom<File> for SerializedFileReader<File> {
47    type Error = ParquetError;
48
49    fn try_from(file: File) -> Result<Self> {
50        Self::new(file)
51    }
52}
53
54impl TryFrom<&Path> for SerializedFileReader<File> {
55    type Error = ParquetError;
56
57    fn try_from(path: &Path) -> Result<Self> {
58        let file = File::open(path)?;
59        Self::try_from(file)
60    }
61}
62
63impl TryFrom<String> for SerializedFileReader<File> {
64    type Error = ParquetError;
65
66    fn try_from(path: String) -> Result<Self> {
67        Self::try_from(Path::new(&path))
68    }
69}
70
71impl TryFrom<&str> for SerializedFileReader<File> {
72    type Error = ParquetError;
73
74    fn try_from(path: &str) -> Result<Self> {
75        Self::try_from(Path::new(&path))
76    }
77}
78
79/// Conversion into a [`RowIter`]
80/// using the full file schema over all row groups.
81impl IntoIterator for SerializedFileReader<File> {
82    type Item = Result<Row>;
83    type IntoIter = RowIter<'static>;
84
85    fn into_iter(self) -> Self::IntoIter {
86        RowIter::from_file_into(Box::new(self))
87    }
88}
89
90// ----------------------------------------------------------------------
91// Implementations of file & row group readers
92
93/// A serialized implementation for Parquet [`FileReader`].
94pub struct SerializedFileReader<R: ChunkReader> {
95    chunk_reader: Arc<R>,
96    metadata: Arc<ParquetMetaData>,
97    props: ReaderPropertiesPtr,
98}
99
100/// A predicate for filtering row groups, invoked with the metadata and index
101/// of each row group in the file. Only row groups for which the predicate
102/// evaluates to `true` will be scanned
103pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
104
105/// A builder for [`ReadOptions`].
106/// For the predicates that are added to the builder,
107/// they will be chained using 'AND' to filter the row groups.
108#[derive(Default)]
109pub struct ReadOptionsBuilder {
110    predicates: Vec<ReadGroupPredicate>,
111    enable_page_index: bool,
112    props: Option<ReaderProperties>,
113    metadata_options: ParquetMetaDataOptions,
114}
115
116impl ReadOptionsBuilder {
117    /// New builder
118    pub fn new() -> Self {
119        Self::default()
120    }
121
122    /// Add a predicate on row group metadata to the reading option,
123    /// Filter only row groups that match the predicate criteria
124    pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
125        self.predicates.push(predicate);
126        self
127    }
128
129    /// Add a range predicate on filtering row groups if their midpoints are within
130    /// the Closed-Open range `[start..end) {x | start <= x < end}`
131    pub fn with_range(mut self, start: i64, end: i64) -> Self {
132        assert!(start < end);
133        let predicate = move |rg: &RowGroupMetaData, _: usize| {
134            let mid = get_midpoint_offset(rg);
135            mid >= start && mid < end
136        };
137        self.predicates.push(Box::new(predicate));
138        self
139    }
140
141    /// Enable reading the page index structures described in
142    /// "[Column Index] Layout to Support Page Skipping"
143    ///
144    /// [Column Index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
145    pub fn with_page_index(mut self) -> Self {
146        self.enable_page_index = true;
147        self
148    }
149
150    /// Set the [`ReaderProperties`] configuration.
151    pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
152        self.props = Some(properties);
153        self
154    }
155
156    /// Provide a Parquet schema to use when decoding the metadata. The schema in the Parquet
157    /// footer will be skipped.
158    pub fn with_parquet_schema(mut self, schema: SchemaDescPtr) -> Self {
159        self.metadata_options.set_schema(schema);
160        self
161    }
162
163    /// Set whether to convert the [`encoding_stats`] in the Parquet `ColumnMetaData` to a bitmask
164    /// (defaults to `false`).
165    ///
166    /// See [`ColumnChunkMetaData::page_encoding_stats_mask`] for an explanation of why this
167    /// might be desirable.
168    ///
169    /// [`encoding_stats`]:
170    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
171    pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
172        self.metadata_options.set_encoding_stats_as_mask(val);
173        self
174    }
175
176    /// Sets the decoding policy for [`encoding_stats`] in the Parquet `ColumnMetaData`.
177    ///
178    /// [`encoding_stats`]:
179    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
180    pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
181        self.metadata_options.set_encoding_stats_policy(policy);
182        self
183    }
184
185    /// Seal the builder and return the read options
186    pub fn build(self) -> ReadOptions {
187        let props = self
188            .props
189            .unwrap_or_else(|| ReaderProperties::builder().build());
190        ReadOptions {
191            predicates: self.predicates,
192            enable_page_index: self.enable_page_index,
193            props,
194            metadata_options: self.metadata_options,
195        }
196    }
197}
198
199/// A collection of options for reading a Parquet file.
200///
201/// Predicates are currently only supported on row group metadata.
202/// All predicates will be chained using 'AND' to filter the row groups.
203pub struct ReadOptions {
204    predicates: Vec<ReadGroupPredicate>,
205    enable_page_index: bool,
206    props: ReaderProperties,
207    metadata_options: ParquetMetaDataOptions,
208}
209
210impl<R: 'static + ChunkReader> SerializedFileReader<R> {
211    /// Creates file reader from a Parquet file.
212    /// Returns an error if the Parquet file does not exist or is corrupt.
213    pub fn new(chunk_reader: R) -> Result<Self> {
214        let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
215        let props = Arc::new(ReaderProperties::builder().build());
216        Ok(Self {
217            chunk_reader: Arc::new(chunk_reader),
218            metadata: Arc::new(metadata),
219            props,
220        })
221    }
222
223    /// Creates file reader from a Parquet file with read options.
224    /// Returns an error if the Parquet file does not exist or is corrupt.
225    #[allow(deprecated)]
226    pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
227        let mut metadata_builder = ParquetMetaDataReader::new()
228            .with_metadata_options(Some(options.metadata_options.clone()))
229            .parse_and_finish(&chunk_reader)?
230            .into_builder();
231        let mut predicates = options.predicates;
232
233        // Filter row groups based on the predicates
234        for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
235            let mut keep = true;
236            for predicate in &mut predicates {
237                if !predicate(&rg_meta, i) {
238                    keep = false;
239                    break;
240                }
241            }
242            if keep {
243                metadata_builder = metadata_builder.add_row_group(rg_meta);
244            }
245        }
246
247        let mut metadata = metadata_builder.build();
248
249        // If page indexes are desired, build them with the filtered set of row groups
250        if options.enable_page_index {
251            let mut reader =
252                ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
253            reader.read_page_indexes(&chunk_reader)?;
254            metadata = reader.finish()?;
255        }
256
257        Ok(Self {
258            chunk_reader: Arc::new(chunk_reader),
259            metadata: Arc::new(metadata),
260            props: Arc::new(options.props),
261        })
262    }
263}
264
265/// Get midpoint offset for a row group
266fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
267    let col = meta.column(0);
268    let mut offset = col.data_page_offset();
269    if let Some(dic_offset) = col.dictionary_page_offset() {
270        if offset > dic_offset {
271            offset = dic_offset
272        }
273    };
274    offset + meta.compressed_size() / 2
275}
276
277impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
278    fn metadata(&self) -> &ParquetMetaData {
279        &self.metadata
280    }
281
282    fn num_row_groups(&self) -> usize {
283        self.metadata.num_row_groups()
284    }
285
286    fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
287        let row_group_metadata = self.metadata.row_group(i);
288        // Row groups should be processed sequentially.
289        let props = Arc::clone(&self.props);
290        let f = Arc::clone(&self.chunk_reader);
291        Ok(Box::new(SerializedRowGroupReader::new(
292            f,
293            row_group_metadata,
294            self.metadata.offset_index().map(|x| x[i].as_slice()),
295            props,
296        )?))
297    }
298
299    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
300        RowIter::from_file(projection, self)
301    }
302}
303
304/// A serialized implementation for Parquet [`RowGroupReader`].
305pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
306    chunk_reader: Arc<R>,
307    metadata: &'a RowGroupMetaData,
308    offset_index: Option<&'a [OffsetIndexMetaData]>,
309    props: ReaderPropertiesPtr,
310    bloom_filters: Vec<Option<Sbbf>>,
311}
312
313impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
314    /// Creates new row group reader from a file, row group metadata and custom config.
315    pub fn new(
316        chunk_reader: Arc<R>,
317        metadata: &'a RowGroupMetaData,
318        offset_index: Option<&'a [OffsetIndexMetaData]>,
319        props: ReaderPropertiesPtr,
320    ) -> Result<Self> {
321        let bloom_filters = if props.read_bloom_filter() {
322            metadata
323                .columns()
324                .iter()
325                .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
326                .collect::<Result<Vec<_>>>()?
327        } else {
328            std::iter::repeat_n(None, metadata.columns().len()).collect()
329        };
330        Ok(Self {
331            chunk_reader,
332            metadata,
333            offset_index,
334            props,
335            bloom_filters,
336        })
337    }
338}
339
340impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
341    fn metadata(&self) -> &RowGroupMetaData {
342        self.metadata
343    }
344
345    fn num_columns(&self) -> usize {
346        self.metadata.num_columns()
347    }
348
349    // TODO: fix PARQUET-816
350    fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
351        let col = self.metadata.column(i);
352
353        let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
354
355        let props = Arc::clone(&self.props);
356        Ok(Box::new(SerializedPageReader::new_with_properties(
357            Arc::clone(&self.chunk_reader),
358            col,
359            usize::try_from(self.metadata.num_rows())?,
360            page_locations,
361            props,
362        )?))
363    }
364
365    /// get bloom filter for the `i`th column
366    fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
367        self.bloom_filters[i].as_ref()
368    }
369
370    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
371        RowIter::from_row_group(projection, self)
372    }
373}
374
375/// Decodes a [`Page`] from the provided `buffer`
376pub(crate) fn decode_page(
377    page_header: PageHeader,
378    buffer: Bytes,
379    physical_type: Type,
380    decompressor: Option<&mut Box<dyn Codec>>,
381) -> Result<Page> {
382    // Verify the 32-bit CRC checksum of the page
383    #[cfg(feature = "crc")]
384    if let Some(expected_crc) = page_header.crc {
385        let crc = crc32fast::hash(&buffer);
386        if crc != expected_crc as u32 {
387            return Err(general_err!("Page CRC checksum mismatch"));
388        }
389    }
390
391    // When processing data page v2, depending on enabled compression for the
392    // page, we should account for uncompressed data ('offset') of
393    // repetition and definition levels.
394    //
395    // We always use 0 offset for other pages other than v2, `true` flag means
396    // that compression will be applied if decompressor is defined
397    let mut offset: usize = 0;
398    let mut can_decompress = true;
399
400    if let Some(ref header_v2) = page_header.data_page_header_v2 {
401        if header_v2.definition_levels_byte_length < 0
402            || header_v2.repetition_levels_byte_length < 0
403            || header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
404                > page_header.uncompressed_page_size
405        {
406            return Err(general_err!(
407                "DataPage v2 header contains implausible values \
408                    for definition_levels_byte_length ({}) \
409                    and repetition_levels_byte_length ({}) \
410                    given DataPage header provides uncompressed_page_size ({})",
411                header_v2.definition_levels_byte_length,
412                header_v2.repetition_levels_byte_length,
413                page_header.uncompressed_page_size
414            ));
415        }
416        offset = usize::try_from(
417            header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
418        )?;
419        // When is_compressed flag is missing the page is considered compressed
420        can_decompress = header_v2.is_compressed.unwrap_or(true);
421    }
422
423    let buffer = match decompressor {
424        Some(decompressor) if can_decompress => {
425            let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
426            if offset > buffer.len() || offset > uncompressed_page_size {
427                return Err(general_err!("Invalid page header"));
428            }
429            let decompressed_size = uncompressed_page_size - offset;
430            let mut decompressed = Vec::with_capacity(uncompressed_page_size);
431            decompressed.extend_from_slice(&buffer[..offset]);
432            // decompressed size of zero corresponds to a page with no non-null values
433            // see https://github.com/apache/parquet-format/blob/master/README.md#data-pages
434            if decompressed_size > 0 {
435                let compressed = &buffer[offset..];
436                decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
437            }
438
439            if decompressed.len() != uncompressed_page_size {
440                return Err(general_err!(
441                    "Actual decompressed size doesn't match the expected one ({} vs {})",
442                    decompressed.len(),
443                    uncompressed_page_size
444                ));
445            }
446
447            Bytes::from(decompressed)
448        }
449        _ => buffer,
450    };
451
452    let result = match page_header.r#type {
453        PageType::DICTIONARY_PAGE => {
454            let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
455                ParquetError::General("Missing dictionary page header".to_string())
456            })?;
457            let is_sorted = dict_header.is_sorted.unwrap_or(false);
458            Page::DictionaryPage {
459                buf: buffer,
460                num_values: dict_header.num_values.try_into()?,
461                encoding: dict_header.encoding,
462                is_sorted,
463            }
464        }
465        PageType::DATA_PAGE => {
466            let header = page_header
467                .data_page_header
468                .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
469            Page::DataPage {
470                buf: buffer,
471                num_values: header.num_values.try_into()?,
472                encoding: header.encoding,
473                def_level_encoding: header.definition_level_encoding,
474                rep_level_encoding: header.repetition_level_encoding,
475                statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
476            }
477        }
478        PageType::DATA_PAGE_V2 => {
479            let header = page_header
480                .data_page_header_v2
481                .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
482            let is_compressed = header.is_compressed.unwrap_or(true);
483            Page::DataPageV2 {
484                buf: buffer,
485                num_values: header.num_values.try_into()?,
486                encoding: header.encoding,
487                num_nulls: header.num_nulls.try_into()?,
488                num_rows: header.num_rows.try_into()?,
489                def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
490                rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
491                is_compressed,
492                statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
493            }
494        }
495        _ => {
496            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
497            return Err(general_err!(
498                "Page type {:?} is not supported",
499                page_header.r#type
500            ));
501        }
502    };
503
504    Ok(result)
505}
506
507enum SerializedPageReaderState {
508    Values {
509        /// The current byte offset in the reader
510        /// Note that offset is u64 (i.e., not usize) to support 32-bit architectures such as WASM
511        offset: u64,
512
513        /// The length of the chunk in bytes
514        /// Note that remaining_bytes is u64 (i.e., not usize) to support 32-bit architectures such as WASM
515        remaining_bytes: u64,
516
517        // If the next page header has already been "peeked", we will cache it and it`s length here
518        next_page_header: Option<Box<PageHeader>>,
519
520        /// The index of the data page within this column chunk
521        page_index: usize,
522
523        /// Whether the next page is expected to be a dictionary page
524        require_dictionary: bool,
525    },
526    Pages {
527        /// Remaining page locations
528        page_locations: VecDeque<PageLocation>,
529        /// Remaining dictionary location if any
530        dictionary_page: Option<PageLocation>,
531        /// The total number of rows in this column chunk
532        total_rows: usize,
533        /// The index of the data page within this column chunk
534        page_index: usize,
535    },
536}
537
538#[derive(Default)]
539struct SerializedPageReaderContext {
540    /// Controls decoding of page-level statistics
541    read_stats: bool,
542    /// Crypto context carrying objects required for decryption
543    #[cfg(feature = "encryption")]
544    crypto_context: Option<Arc<CryptoContext>>,
545}
546
547/// A serialized implementation for Parquet [`PageReader`].
548pub struct SerializedPageReader<R: ChunkReader> {
549    /// The chunk reader
550    reader: Arc<R>,
551
552    /// The compression codec for this column chunk. Only set for non-PLAIN codec.
553    decompressor: Option<Box<dyn Codec>>,
554
555    /// Column chunk type.
556    physical_type: Type,
557
558    state: SerializedPageReaderState,
559
560    context: SerializedPageReaderContext,
561}
562
563impl<R: ChunkReader> SerializedPageReader<R> {
564    /// Creates a new serialized page reader from a chunk reader and metadata
565    pub fn new(
566        reader: Arc<R>,
567        column_chunk_metadata: &ColumnChunkMetaData,
568        total_rows: usize,
569        page_locations: Option<Vec<PageLocation>>,
570    ) -> Result<Self> {
571        let props = Arc::new(ReaderProperties::builder().build());
572        SerializedPageReader::new_with_properties(
573            reader,
574            column_chunk_metadata,
575            total_rows,
576            page_locations,
577            props,
578        )
579    }
580
581    /// Stub No-op implementation when encryption is disabled.
582    #[cfg(all(feature = "arrow", not(feature = "encryption")))]
583    pub(crate) fn add_crypto_context(
584        self,
585        _rg_idx: usize,
586        _column_idx: usize,
587        _parquet_meta_data: &ParquetMetaData,
588        _column_chunk_metadata: &ColumnChunkMetaData,
589    ) -> Result<SerializedPageReader<R>> {
590        Ok(self)
591    }
592
593    /// Adds any necessary crypto context to this page reader, if encryption is enabled.
594    #[cfg(feature = "encryption")]
595    pub(crate) fn add_crypto_context(
596        mut self,
597        rg_idx: usize,
598        column_idx: usize,
599        parquet_meta_data: &ParquetMetaData,
600        column_chunk_metadata: &ColumnChunkMetaData,
601    ) -> Result<SerializedPageReader<R>> {
602        let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
603            return Ok(self);
604        };
605        let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
606            return Ok(self);
607        };
608        let crypto_context =
609            CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
610        self.context.crypto_context = Some(Arc::new(crypto_context));
611        Ok(self)
612    }
613
614    /// Creates a new serialized page with custom options.
615    pub fn new_with_properties(
616        reader: Arc<R>,
617        meta: &ColumnChunkMetaData,
618        total_rows: usize,
619        page_locations: Option<Vec<PageLocation>>,
620        props: ReaderPropertiesPtr,
621    ) -> Result<Self> {
622        let decompressor = create_codec(meta.compression(), props.codec_options())?;
623        let (start, len) = meta.byte_range();
624
625        let state = match page_locations {
626            Some(locations) => {
627                // If the offset of the first page doesn't match the start of the column chunk
628                // then the preceding space must contain a dictionary page.
629                let dictionary_page = match locations.first() {
630                    Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
631                        offset: start as i64,
632                        compressed_page_size: (dict_offset.offset as u64 - start) as i32,
633                        first_row_index: 0,
634                    }),
635                    _ => None,
636                };
637
638                SerializedPageReaderState::Pages {
639                    page_locations: locations.into(),
640                    dictionary_page,
641                    total_rows,
642                    page_index: 0,
643                }
644            }
645            None => SerializedPageReaderState::Values {
646                offset: start,
647                remaining_bytes: len,
648                next_page_header: None,
649                page_index: 0,
650                require_dictionary: meta.dictionary_page_offset().is_some(),
651            },
652        };
653        let mut context = SerializedPageReaderContext::default();
654        if props.read_page_stats() {
655            context.read_stats = true;
656        }
657        Ok(Self {
658            reader,
659            decompressor,
660            state,
661            physical_type: meta.column_type(),
662            context,
663        })
664    }
665
666    /// Similar to `peek_next_page`, but returns the offset of the next page instead of the page metadata.
667    /// Unlike page metadata, an offset can uniquely identify a page.
668    ///
669    /// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice.
670    /// This function allows us to check if the next page is being cached or read previously.
671    #[cfg(test)]
672    fn peek_next_page_offset(&mut self) -> Result<Option<u64>> {
673        match &mut self.state {
674            SerializedPageReaderState::Values {
675                offset,
676                remaining_bytes,
677                next_page_header,
678                page_index,
679                require_dictionary,
680            } => {
681                loop {
682                    if *remaining_bytes == 0 {
683                        return Ok(None);
684                    }
685                    return if let Some(header) = next_page_header.as_ref() {
686                        if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
687                            Ok(Some(*offset))
688                        } else {
689                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
690                            *next_page_header = None;
691                            continue;
692                        }
693                    } else {
694                        let mut read = self.reader.get_read(*offset)?;
695                        let (header_len, header) = Self::read_page_header_len(
696                            &self.context,
697                            &mut read,
698                            *page_index,
699                            *require_dictionary,
700                        )?;
701                        *offset += header_len as u64;
702                        *remaining_bytes -= header_len as u64;
703                        let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
704                            Ok(Some(*offset))
705                        } else {
706                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
707                            continue;
708                        };
709                        *next_page_header = Some(Box::new(header));
710                        page_meta
711                    };
712                }
713            }
714            SerializedPageReaderState::Pages {
715                page_locations,
716                dictionary_page,
717                ..
718            } => {
719                if let Some(page) = dictionary_page {
720                    Ok(Some(page.offset as u64))
721                } else if let Some(page) = page_locations.front() {
722                    Ok(Some(page.offset as u64))
723                } else {
724                    Ok(None)
725                }
726            }
727        }
728    }
729
730    fn read_page_header_len<T: Read>(
731        context: &SerializedPageReaderContext,
732        input: &mut T,
733        page_index: usize,
734        dictionary_page: bool,
735    ) -> Result<(usize, PageHeader)> {
736        /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
737        struct TrackedRead<R> {
738            inner: R,
739            bytes_read: usize,
740        }
741
742        impl<R: Read> Read for TrackedRead<R> {
743            fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
744                let v = self.inner.read(buf)?;
745                self.bytes_read += v;
746                Ok(v)
747            }
748        }
749
750        let mut tracked = TrackedRead {
751            inner: input,
752            bytes_read: 0,
753        };
754        let header = context.read_page_header(&mut tracked, page_index, dictionary_page)?;
755        Ok((tracked.bytes_read, header))
756    }
757
758    fn read_page_header_len_from_bytes(
759        context: &SerializedPageReaderContext,
760        buffer: &[u8],
761        page_index: usize,
762        dictionary_page: bool,
763    ) -> Result<(usize, PageHeader)> {
764        let mut input = std::io::Cursor::new(buffer);
765        let header = context.read_page_header(&mut input, page_index, dictionary_page)?;
766        let header_len = input.position() as usize;
767        Ok((header_len, header))
768    }
769}
770
771#[cfg(not(feature = "encryption"))]
772impl SerializedPageReaderContext {
773    fn read_page_header<T: Read>(
774        &self,
775        input: &mut T,
776        _page_index: usize,
777        _dictionary_page: bool,
778    ) -> Result<PageHeader> {
779        let mut prot = ThriftReadInputProtocol::new(input);
780        if self.read_stats {
781            Ok(PageHeader::read_thrift(&mut prot)?)
782        } else {
783            Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
784        }
785    }
786
787    fn decrypt_page_data<T>(
788        &self,
789        buffer: T,
790        _page_index: usize,
791        _dictionary_page: bool,
792    ) -> Result<T> {
793        Ok(buffer)
794    }
795}
796
797#[cfg(feature = "encryption")]
798impl SerializedPageReaderContext {
799    fn read_page_header<T: Read>(
800        &self,
801        input: &mut T,
802        page_index: usize,
803        dictionary_page: bool,
804    ) -> Result<PageHeader> {
805        match self.page_crypto_context(page_index, dictionary_page) {
806            None => {
807                let mut prot = ThriftReadInputProtocol::new(input);
808                if self.read_stats {
809                    Ok(PageHeader::read_thrift(&mut prot)?)
810                } else {
811                    use crate::file::metadata::thrift::PageHeader;
812
813                    Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
814                }
815            }
816            Some(page_crypto_context) => {
817                let data_decryptor = page_crypto_context.data_decryptor();
818                let aad = page_crypto_context.create_page_header_aad()?;
819
820                let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
821                    ParquetError::General(format!(
822                        "Error decrypting page header for column {}, decryption key may be wrong",
823                        page_crypto_context.column_ordinal
824                    ))
825                })?;
826
827                let mut prot = ThriftSliceInputProtocol::new(buf.as_slice());
828                if self.read_stats {
829                    Ok(PageHeader::read_thrift(&mut prot)?)
830                } else {
831                    Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
832                }
833            }
834        }
835    }
836
837    fn decrypt_page_data<T>(&self, buffer: T, page_index: usize, dictionary_page: bool) -> Result<T>
838    where
839        T: AsRef<[u8]>,
840        T: From<Vec<u8>>,
841    {
842        let page_crypto_context = self.page_crypto_context(page_index, dictionary_page);
843        if let Some(page_crypto_context) = page_crypto_context {
844            let decryptor = page_crypto_context.data_decryptor();
845            let aad = page_crypto_context.create_page_aad()?;
846            let decrypted = decryptor.decrypt(buffer.as_ref(), &aad)?;
847            Ok(T::from(decrypted))
848        } else {
849            Ok(buffer)
850        }
851    }
852
853    fn page_crypto_context(
854        &self,
855        page_index: usize,
856        dictionary_page: bool,
857    ) -> Option<Arc<CryptoContext>> {
858        self.crypto_context.as_ref().map(|c| {
859            Arc::new(if dictionary_page {
860                c.for_dictionary_page()
861            } else {
862                c.with_page_ordinal(page_index)
863            })
864        })
865    }
866}
867
868impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
869    type Item = Result<Page>;
870
871    fn next(&mut self) -> Option<Self::Item> {
872        self.get_next_page().transpose()
873    }
874}
875
876fn verify_page_header_len(header_len: usize, remaining_bytes: u64) -> Result<()> {
877    if header_len as u64 > remaining_bytes {
878        return Err(eof_err!("Invalid page header"));
879    }
880    Ok(())
881}
882
883fn verify_page_size(
884    compressed_size: i32,
885    uncompressed_size: i32,
886    remaining_bytes: u64,
887) -> Result<()> {
888    // The page's compressed size should not exceed the remaining bytes that are
889    // available to read. The page's uncompressed size is the expected size
890    // after decompression, which can never be negative.
891    if compressed_size < 0 || compressed_size as u64 > remaining_bytes || uncompressed_size < 0 {
892        return Err(eof_err!("Invalid page header"));
893    }
894    Ok(())
895}
896
897impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
898    fn get_next_page(&mut self) -> Result<Option<Page>> {
899        loop {
900            let page = match &mut self.state {
901                SerializedPageReaderState::Values {
902                    offset,
903                    remaining_bytes: remaining,
904                    next_page_header,
905                    page_index,
906                    require_dictionary,
907                } => {
908                    if *remaining == 0 {
909                        return Ok(None);
910                    }
911
912                    let mut read = self.reader.get_read(*offset)?;
913                    let header = if let Some(header) = next_page_header.take() {
914                        *header
915                    } else {
916                        let (header_len, header) = Self::read_page_header_len(
917                            &self.context,
918                            &mut read,
919                            *page_index,
920                            *require_dictionary,
921                        )?;
922                        verify_page_header_len(header_len, *remaining)?;
923                        *offset += header_len as u64;
924                        *remaining -= header_len as u64;
925                        header
926                    };
927                    verify_page_size(
928                        header.compressed_page_size,
929                        header.uncompressed_page_size,
930                        *remaining,
931                    )?;
932                    let data_len = header.compressed_page_size as usize;
933                    let data_start = *offset;
934                    *offset += data_len as u64;
935                    *remaining -= data_len as u64;
936
937                    if header.r#type == PageType::INDEX_PAGE {
938                        continue;
939                    }
940
941                    let buffer = self.reader.get_bytes(data_start, data_len)?;
942
943                    let buffer =
944                        self.context
945                            .decrypt_page_data(buffer, *page_index, *require_dictionary)?;
946
947                    let page = decode_page(
948                        header,
949                        buffer,
950                        self.physical_type,
951                        self.decompressor.as_mut(),
952                    )?;
953                    if page.is_data_page() {
954                        *page_index += 1;
955                    } else if page.is_dictionary_page() {
956                        *require_dictionary = false;
957                    }
958                    page
959                }
960                SerializedPageReaderState::Pages {
961                    page_locations,
962                    dictionary_page,
963                    page_index,
964                    ..
965                } => {
966                    let (front, is_dictionary_page) = match dictionary_page.take() {
967                        Some(front) => (front, true),
968                        None => match page_locations.pop_front() {
969                            Some(front) => (front, false),
970                            None => return Ok(None),
971                        },
972                    };
973
974                    let page_len = usize::try_from(front.compressed_page_size)?;
975                    let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
976
977                    let (offset, header) = Self::read_page_header_len_from_bytes(
978                        &self.context,
979                        buffer.as_ref(),
980                        *page_index,
981                        is_dictionary_page,
982                    )?;
983                    let bytes = buffer.slice(offset..);
984                    let bytes =
985                        self.context
986                            .decrypt_page_data(bytes, *page_index, is_dictionary_page)?;
987
988                    if !is_dictionary_page {
989                        *page_index += 1;
990                    }
991                    decode_page(
992                        header,
993                        bytes,
994                        self.physical_type,
995                        self.decompressor.as_mut(),
996                    )?
997                }
998            };
999
1000            return Ok(Some(page));
1001        }
1002    }
1003
1004    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
1005        match &mut self.state {
1006            SerializedPageReaderState::Values {
1007                offset,
1008                remaining_bytes,
1009                next_page_header,
1010                page_index,
1011                require_dictionary,
1012            } => {
1013                loop {
1014                    if *remaining_bytes == 0 {
1015                        return Ok(None);
1016                    }
1017                    return if let Some(header) = next_page_header.as_ref() {
1018                        if let Ok(page_meta) = (&**header).try_into() {
1019                            Ok(Some(page_meta))
1020                        } else {
1021                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
1022                            *next_page_header = None;
1023                            continue;
1024                        }
1025                    } else {
1026                        let mut read = self.reader.get_read(*offset)?;
1027                        let (header_len, header) = Self::read_page_header_len(
1028                            &self.context,
1029                            &mut read,
1030                            *page_index,
1031                            *require_dictionary,
1032                        )?;
1033                        verify_page_header_len(header_len, *remaining_bytes)?;
1034                        *offset += header_len as u64;
1035                        *remaining_bytes -= header_len as u64;
1036                        let page_meta = if let Ok(page_meta) = (&header).try_into() {
1037                            Ok(Some(page_meta))
1038                        } else {
1039                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
1040                            continue;
1041                        };
1042                        *next_page_header = Some(Box::new(header));
1043                        page_meta
1044                    };
1045                }
1046            }
1047            SerializedPageReaderState::Pages {
1048                page_locations,
1049                dictionary_page,
1050                total_rows,
1051                page_index: _,
1052            } => {
1053                if dictionary_page.is_some() {
1054                    Ok(Some(PageMetadata {
1055                        num_rows: None,
1056                        num_levels: None,
1057                        is_dict: true,
1058                    }))
1059                } else if let Some(page) = page_locations.front() {
1060                    let next_rows = page_locations
1061                        .get(1)
1062                        .map(|x| x.first_row_index as usize)
1063                        .unwrap_or(*total_rows);
1064
1065                    Ok(Some(PageMetadata {
1066                        num_rows: Some(next_rows - page.first_row_index as usize),
1067                        num_levels: None,
1068                        is_dict: false,
1069                    }))
1070                } else {
1071                    Ok(None)
1072                }
1073            }
1074        }
1075    }
1076
1077    fn skip_next_page(&mut self) -> Result<()> {
1078        match &mut self.state {
1079            SerializedPageReaderState::Values {
1080                offset,
1081                remaining_bytes,
1082                next_page_header,
1083                page_index,
1084                require_dictionary,
1085            } => {
1086                if let Some(buffered_header) = next_page_header.take() {
1087                    verify_page_size(
1088                        buffered_header.compressed_page_size,
1089                        buffered_header.uncompressed_page_size,
1090                        *remaining_bytes,
1091                    )?;
1092                    // The next page header has already been peeked, so just advance the offset
1093                    *offset += buffered_header.compressed_page_size as u64;
1094                    *remaining_bytes -= buffered_header.compressed_page_size as u64;
1095                } else {
1096                    let mut read = self.reader.get_read(*offset)?;
1097                    let (header_len, header) = Self::read_page_header_len(
1098                        &self.context,
1099                        &mut read,
1100                        *page_index,
1101                        *require_dictionary,
1102                    )?;
1103                    verify_page_header_len(header_len, *remaining_bytes)?;
1104                    verify_page_size(
1105                        header.compressed_page_size,
1106                        header.uncompressed_page_size,
1107                        *remaining_bytes,
1108                    )?;
1109                    let data_page_size = header.compressed_page_size as u64;
1110                    *offset += header_len as u64 + data_page_size;
1111                    *remaining_bytes -= header_len as u64 + data_page_size;
1112                }
1113                if *require_dictionary {
1114                    *require_dictionary = false;
1115                } else {
1116                    *page_index += 1;
1117                }
1118                Ok(())
1119            }
1120            SerializedPageReaderState::Pages {
1121                page_locations,
1122                dictionary_page,
1123                page_index,
1124                ..
1125            } => {
1126                if dictionary_page.is_some() {
1127                    // If a dictionary page exists, consume it by taking it (sets to None)
1128                    dictionary_page.take();
1129                } else {
1130                    // If no dictionary page exists, simply pop the data page from page_locations
1131                    if page_locations.pop_front().is_some() {
1132                        *page_index += 1;
1133                    }
1134                }
1135
1136                Ok(())
1137            }
1138        }
1139    }
1140
1141    fn at_record_boundary(&mut self) -> Result<bool> {
1142        match &mut self.state {
1143            SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
1144            SerializedPageReaderState::Pages { .. } => Ok(true),
1145        }
1146    }
1147}
1148
1149#[cfg(test)]
1150mod tests {
1151    use std::collections::HashSet;
1152
1153    use bytes::Buf;
1154
1155    use crate::file::page_index::column_index::{
1156        ByteArrayColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex,
1157    };
1158    use crate::file::properties::{EnabledStatistics, WriterProperties};
1159
1160    use crate::basic::{self, BoundaryOrder, ColumnOrder, Encoding, SortOrder};
1161    use crate::column::reader::ColumnReader;
1162    use crate::data_type::private::ParquetValueType;
1163    use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1164    use crate::file::metadata::thrift::DataPageHeaderV2;
1165    #[allow(deprecated)]
1166    use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
1167    use crate::file::writer::SerializedFileWriter;
1168    use crate::record::RowAccessor;
1169    use crate::schema::parser::parse_message_type;
1170    use crate::util::test_common::file_util::{get_test_file, get_test_path};
1171
1172    use super::*;
1173
1174    #[test]
1175    fn test_decode_page_invalid_offset() {
1176        let page_header = PageHeader {
1177            r#type: PageType::DATA_PAGE_V2,
1178            uncompressed_page_size: 10,
1179            compressed_page_size: 10,
1180            data_page_header: None,
1181            index_page_header: None,
1182            dictionary_page_header: None,
1183            crc: None,
1184            data_page_header_v2: Some(DataPageHeaderV2 {
1185                num_nulls: 0,
1186                num_rows: 0,
1187                num_values: 0,
1188                encoding: Encoding::PLAIN,
1189                definition_levels_byte_length: 11,
1190                repetition_levels_byte_length: 0,
1191                is_compressed: None,
1192                statistics: None,
1193            }),
1194        };
1195
1196        let buffer = Bytes::new();
1197        let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1198        assert!(
1199            err.to_string()
1200                .contains("DataPage v2 header contains implausible values")
1201        );
1202    }
1203
1204    #[test]
1205    fn test_decode_unsupported_page() {
1206        let mut page_header = PageHeader {
1207            r#type: PageType::INDEX_PAGE,
1208            uncompressed_page_size: 10,
1209            compressed_page_size: 10,
1210            data_page_header: None,
1211            index_page_header: None,
1212            dictionary_page_header: None,
1213            crc: None,
1214            data_page_header_v2: None,
1215        };
1216        let buffer = Bytes::new();
1217        let err = decode_page(page_header.clone(), buffer.clone(), Type::INT32, None).unwrap_err();
1218        assert_eq!(
1219            err.to_string(),
1220            "Parquet error: Page type INDEX_PAGE is not supported"
1221        );
1222
1223        page_header.data_page_header_v2 = Some(DataPageHeaderV2 {
1224            num_nulls: 0,
1225            num_rows: 0,
1226            num_values: 0,
1227            encoding: Encoding::PLAIN,
1228            definition_levels_byte_length: 11,
1229            repetition_levels_byte_length: 0,
1230            is_compressed: None,
1231            statistics: None,
1232        });
1233        let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1234        assert!(
1235            err.to_string()
1236                .contains("DataPage v2 header contains implausible values")
1237        );
1238    }
1239
1240    #[test]
1241    fn test_cursor_and_file_has_the_same_behaviour() {
1242        let mut buf: Vec<u8> = Vec::new();
1243        get_test_file("alltypes_plain.parquet")
1244            .read_to_end(&mut buf)
1245            .unwrap();
1246        let cursor = Bytes::from(buf);
1247        let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1248
1249        let test_file = get_test_file("alltypes_plain.parquet");
1250        let read_from_file = SerializedFileReader::new(test_file).unwrap();
1251
1252        let file_iter = read_from_file.get_row_iter(None).unwrap();
1253        let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1254
1255        for (a, b) in file_iter.zip(cursor_iter) {
1256            assert_eq!(a.unwrap(), b.unwrap())
1257        }
1258    }
1259
1260    #[test]
1261    fn test_file_reader_try_from() {
1262        // Valid file path
1263        let test_file = get_test_file("alltypes_plain.parquet");
1264        let test_path_buf = get_test_path("alltypes_plain.parquet");
1265        let test_path = test_path_buf.as_path();
1266        let test_path_str = test_path.to_str().unwrap();
1267
1268        let reader = SerializedFileReader::try_from(test_file);
1269        assert!(reader.is_ok());
1270
1271        let reader = SerializedFileReader::try_from(test_path);
1272        assert!(reader.is_ok());
1273
1274        let reader = SerializedFileReader::try_from(test_path_str);
1275        assert!(reader.is_ok());
1276
1277        let reader = SerializedFileReader::try_from(test_path_str.to_string());
1278        assert!(reader.is_ok());
1279
1280        // Invalid file path
1281        let test_path = Path::new("invalid.parquet");
1282        let test_path_str = test_path.to_str().unwrap();
1283
1284        let reader = SerializedFileReader::try_from(test_path);
1285        assert!(reader.is_err());
1286
1287        let reader = SerializedFileReader::try_from(test_path_str);
1288        assert!(reader.is_err());
1289
1290        let reader = SerializedFileReader::try_from(test_path_str.to_string());
1291        assert!(reader.is_err());
1292    }
1293
1294    #[test]
1295    fn test_file_reader_into_iter() {
1296        let path = get_test_path("alltypes_plain.parquet");
1297        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1298        let iter = reader.into_iter();
1299        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1300
1301        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1302    }
1303
1304    #[test]
1305    fn test_file_reader_into_iter_project() {
1306        let path = get_test_path("alltypes_plain.parquet");
1307        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1308        let schema = "message schema { OPTIONAL INT32 id; }";
1309        let proj = parse_message_type(schema).ok();
1310        let iter = reader.into_iter().project(proj).unwrap();
1311        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1312
1313        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1314    }
1315
1316    #[test]
1317    fn test_reuse_file_chunk() {
1318        // This test covers the case of maintaining the correct start position in a file
1319        // stream for each column reader after initializing and moving to the next one
1320        // (without necessarily reading the entire column).
1321        let test_file = get_test_file("alltypes_plain.parquet");
1322        let reader = SerializedFileReader::new(test_file).unwrap();
1323        let row_group = reader.get_row_group(0).unwrap();
1324
1325        let mut page_readers = Vec::new();
1326        for i in 0..row_group.num_columns() {
1327            page_readers.push(row_group.get_column_page_reader(i).unwrap());
1328        }
1329
1330        // Now buffer each col reader, we do not expect any failures like:
1331        // General("underlying Thrift error: end of file")
1332        for mut page_reader in page_readers {
1333            assert!(page_reader.get_next_page().is_ok());
1334        }
1335    }
1336
1337    #[test]
1338    fn test_file_reader() {
1339        let test_file = get_test_file("alltypes_plain.parquet");
1340        let reader_result = SerializedFileReader::new(test_file);
1341        assert!(reader_result.is_ok());
1342        let reader = reader_result.unwrap();
1343
1344        // Test contents in Parquet metadata
1345        let metadata = reader.metadata();
1346        assert_eq!(metadata.num_row_groups(), 1);
1347
1348        // Test contents in file metadata
1349        let file_metadata = metadata.file_metadata();
1350        assert!(file_metadata.created_by().is_some());
1351        assert_eq!(
1352            file_metadata.created_by().unwrap(),
1353            "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1354        );
1355        assert!(file_metadata.key_value_metadata().is_none());
1356        assert_eq!(file_metadata.num_rows(), 8);
1357        assert_eq!(file_metadata.version(), 1);
1358        assert_eq!(file_metadata.column_orders(), None);
1359
1360        // Test contents in row group metadata
1361        let row_group_metadata = metadata.row_group(0);
1362        assert_eq!(row_group_metadata.num_columns(), 11);
1363        assert_eq!(row_group_metadata.num_rows(), 8);
1364        assert_eq!(row_group_metadata.total_byte_size(), 671);
1365        // Check each column order
1366        for i in 0..row_group_metadata.num_columns() {
1367            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1368        }
1369
1370        // Test row group reader
1371        let row_group_reader_result = reader.get_row_group(0);
1372        assert!(row_group_reader_result.is_ok());
1373        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1374        assert_eq!(
1375            row_group_reader.num_columns(),
1376            row_group_metadata.num_columns()
1377        );
1378        assert_eq!(
1379            row_group_reader.metadata().total_byte_size(),
1380            row_group_metadata.total_byte_size()
1381        );
1382
1383        // Test page readers
1384        // TODO: test for every column
1385        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1386        assert!(page_reader_0_result.is_ok());
1387        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1388        let mut page_count = 0;
1389        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1390            let is_expected_page = match page {
1391                Page::DictionaryPage {
1392                    buf,
1393                    num_values,
1394                    encoding,
1395                    is_sorted,
1396                } => {
1397                    assert_eq!(buf.len(), 32);
1398                    assert_eq!(num_values, 8);
1399                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1400                    assert!(!is_sorted);
1401                    true
1402                }
1403                Page::DataPage {
1404                    buf,
1405                    num_values,
1406                    encoding,
1407                    def_level_encoding,
1408                    rep_level_encoding,
1409                    statistics,
1410                } => {
1411                    assert_eq!(buf.len(), 11);
1412                    assert_eq!(num_values, 8);
1413                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1414                    assert_eq!(def_level_encoding, Encoding::RLE);
1415                    #[allow(deprecated)]
1416                    let expected_rep_level_encoding = Encoding::BIT_PACKED;
1417                    assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1418                    assert!(statistics.is_none());
1419                    true
1420                }
1421                _ => false,
1422            };
1423            assert!(is_expected_page);
1424            page_count += 1;
1425        }
1426        assert_eq!(page_count, 2);
1427    }
1428
1429    #[test]
1430    fn test_file_reader_datapage_v2() {
1431        let test_file = get_test_file("datapage_v2.snappy.parquet");
1432        let reader_result = SerializedFileReader::new(test_file);
1433        assert!(reader_result.is_ok());
1434        let reader = reader_result.unwrap();
1435
1436        // Test contents in Parquet metadata
1437        let metadata = reader.metadata();
1438        assert_eq!(metadata.num_row_groups(), 1);
1439
1440        // Test contents in file metadata
1441        let file_metadata = metadata.file_metadata();
1442        assert!(file_metadata.created_by().is_some());
1443        assert_eq!(
1444            file_metadata.created_by().unwrap(),
1445            "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1446        );
1447        assert!(file_metadata.key_value_metadata().is_some());
1448        assert_eq!(
1449            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1450            1
1451        );
1452
1453        assert_eq!(file_metadata.num_rows(), 5);
1454        assert_eq!(file_metadata.version(), 1);
1455        assert_eq!(file_metadata.column_orders(), None);
1456
1457        let row_group_metadata = metadata.row_group(0);
1458
1459        // Check each column order
1460        for i in 0..row_group_metadata.num_columns() {
1461            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1462        }
1463
1464        // Test row group reader
1465        let row_group_reader_result = reader.get_row_group(0);
1466        assert!(row_group_reader_result.is_ok());
1467        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1468        assert_eq!(
1469            row_group_reader.num_columns(),
1470            row_group_metadata.num_columns()
1471        );
1472        assert_eq!(
1473            row_group_reader.metadata().total_byte_size(),
1474            row_group_metadata.total_byte_size()
1475        );
1476
1477        // Test page readers
1478        // TODO: test for every column
1479        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1480        assert!(page_reader_0_result.is_ok());
1481        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1482        let mut page_count = 0;
1483        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1484            let is_expected_page = match page {
1485                Page::DictionaryPage {
1486                    buf,
1487                    num_values,
1488                    encoding,
1489                    is_sorted,
1490                } => {
1491                    assert_eq!(buf.len(), 7);
1492                    assert_eq!(num_values, 1);
1493                    assert_eq!(encoding, Encoding::PLAIN);
1494                    assert!(!is_sorted);
1495                    true
1496                }
1497                Page::DataPageV2 {
1498                    buf,
1499                    num_values,
1500                    encoding,
1501                    num_nulls,
1502                    num_rows,
1503                    def_levels_byte_len,
1504                    rep_levels_byte_len,
1505                    is_compressed,
1506                    statistics,
1507                } => {
1508                    assert_eq!(buf.len(), 4);
1509                    assert_eq!(num_values, 5);
1510                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1511                    assert_eq!(num_nulls, 1);
1512                    assert_eq!(num_rows, 5);
1513                    assert_eq!(def_levels_byte_len, 2);
1514                    assert_eq!(rep_levels_byte_len, 0);
1515                    assert!(is_compressed);
1516                    assert!(statistics.is_none()); // page stats are no longer read
1517                    true
1518                }
1519                _ => false,
1520            };
1521            assert!(is_expected_page);
1522            page_count += 1;
1523        }
1524        assert_eq!(page_count, 2);
1525    }
1526
1527    #[test]
1528    fn test_file_reader_empty_compressed_datapage_v2() {
1529        // this file has a compressed datapage that un-compresses to 0 bytes
1530        let test_file = get_test_file("page_v2_empty_compressed.parquet");
1531        let reader_result = SerializedFileReader::new(test_file);
1532        assert!(reader_result.is_ok());
1533        let reader = reader_result.unwrap();
1534
1535        // Test contents in Parquet metadata
1536        let metadata = reader.metadata();
1537        assert_eq!(metadata.num_row_groups(), 1);
1538
1539        // Test contents in file metadata
1540        let file_metadata = metadata.file_metadata();
1541        assert!(file_metadata.created_by().is_some());
1542        assert_eq!(
1543            file_metadata.created_by().unwrap(),
1544            "parquet-cpp-arrow version 14.0.2"
1545        );
1546        assert!(file_metadata.key_value_metadata().is_some());
1547        assert_eq!(
1548            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1549            1
1550        );
1551
1552        assert_eq!(file_metadata.num_rows(), 10);
1553        assert_eq!(file_metadata.version(), 2);
1554        let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1555        assert_eq!(
1556            file_metadata.column_orders(),
1557            Some(vec![expected_order].as_ref())
1558        );
1559
1560        let row_group_metadata = metadata.row_group(0);
1561
1562        // Check each column order
1563        for i in 0..row_group_metadata.num_columns() {
1564            assert_eq!(file_metadata.column_order(i), expected_order);
1565        }
1566
1567        // Test row group reader
1568        let row_group_reader_result = reader.get_row_group(0);
1569        assert!(row_group_reader_result.is_ok());
1570        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1571        assert_eq!(
1572            row_group_reader.num_columns(),
1573            row_group_metadata.num_columns()
1574        );
1575        assert_eq!(
1576            row_group_reader.metadata().total_byte_size(),
1577            row_group_metadata.total_byte_size()
1578        );
1579
1580        // Test page readers
1581        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1582        assert!(page_reader_0_result.is_ok());
1583        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1584        let mut page_count = 0;
1585        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1586            let is_expected_page = match page {
1587                Page::DictionaryPage {
1588                    buf,
1589                    num_values,
1590                    encoding,
1591                    is_sorted,
1592                } => {
1593                    assert_eq!(buf.len(), 0);
1594                    assert_eq!(num_values, 0);
1595                    assert_eq!(encoding, Encoding::PLAIN);
1596                    assert!(!is_sorted);
1597                    true
1598                }
1599                Page::DataPageV2 {
1600                    buf,
1601                    num_values,
1602                    encoding,
1603                    num_nulls,
1604                    num_rows,
1605                    def_levels_byte_len,
1606                    rep_levels_byte_len,
1607                    is_compressed,
1608                    statistics,
1609                } => {
1610                    assert_eq!(buf.len(), 3);
1611                    assert_eq!(num_values, 10);
1612                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1613                    assert_eq!(num_nulls, 10);
1614                    assert_eq!(num_rows, 10);
1615                    assert_eq!(def_levels_byte_len, 2);
1616                    assert_eq!(rep_levels_byte_len, 0);
1617                    assert!(is_compressed);
1618                    assert!(statistics.is_none()); // page stats are no longer read
1619                    true
1620                }
1621                _ => false,
1622            };
1623            assert!(is_expected_page);
1624            page_count += 1;
1625        }
1626        assert_eq!(page_count, 2);
1627    }
1628
1629    #[test]
1630    fn test_file_reader_empty_datapage_v2() {
1631        // this file has 0 bytes compressed datapage that un-compresses to 0 bytes
1632        let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1633        let reader_result = SerializedFileReader::new(test_file);
1634        assert!(reader_result.is_ok());
1635        let reader = reader_result.unwrap();
1636
1637        // Test contents in Parquet metadata
1638        let metadata = reader.metadata();
1639        assert_eq!(metadata.num_row_groups(), 1);
1640
1641        // Test contents in file metadata
1642        let file_metadata = metadata.file_metadata();
1643        assert!(file_metadata.created_by().is_some());
1644        assert_eq!(
1645            file_metadata.created_by().unwrap(),
1646            "parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1647        );
1648        assert!(file_metadata.key_value_metadata().is_some());
1649        assert_eq!(
1650            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1651            2
1652        );
1653
1654        assert_eq!(file_metadata.num_rows(), 1);
1655        assert_eq!(file_metadata.version(), 1);
1656        let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1657        assert_eq!(
1658            file_metadata.column_orders(),
1659            Some(vec![expected_order].as_ref())
1660        );
1661
1662        let row_group_metadata = metadata.row_group(0);
1663
1664        // Check each column order
1665        for i in 0..row_group_metadata.num_columns() {
1666            assert_eq!(file_metadata.column_order(i), expected_order);
1667        }
1668
1669        // Test row group reader
1670        let row_group_reader_result = reader.get_row_group(0);
1671        assert!(row_group_reader_result.is_ok());
1672        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1673        assert_eq!(
1674            row_group_reader.num_columns(),
1675            row_group_metadata.num_columns()
1676        );
1677        assert_eq!(
1678            row_group_reader.metadata().total_byte_size(),
1679            row_group_metadata.total_byte_size()
1680        );
1681
1682        // Test page readers
1683        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1684        assert!(page_reader_0_result.is_ok());
1685        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1686        let mut page_count = 0;
1687        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1688            let is_expected_page = match page {
1689                Page::DataPageV2 {
1690                    buf,
1691                    num_values,
1692                    encoding,
1693                    num_nulls,
1694                    num_rows,
1695                    def_levels_byte_len,
1696                    rep_levels_byte_len,
1697                    is_compressed,
1698                    statistics,
1699                } => {
1700                    assert_eq!(buf.len(), 2);
1701                    assert_eq!(num_values, 1);
1702                    assert_eq!(encoding, Encoding::PLAIN);
1703                    assert_eq!(num_nulls, 1);
1704                    assert_eq!(num_rows, 1);
1705                    assert_eq!(def_levels_byte_len, 2);
1706                    assert_eq!(rep_levels_byte_len, 0);
1707                    assert!(is_compressed);
1708                    assert!(statistics.is_none());
1709                    true
1710                }
1711                _ => false,
1712            };
1713            assert!(is_expected_page);
1714            page_count += 1;
1715        }
1716        assert_eq!(page_count, 1);
1717    }
1718
1719    fn get_serialized_page_reader<R: ChunkReader>(
1720        file_reader: &SerializedFileReader<R>,
1721        row_group: usize,
1722        column: usize,
1723    ) -> Result<SerializedPageReader<R>> {
1724        let row_group = {
1725            let row_group_metadata = file_reader.metadata.row_group(row_group);
1726            let props = Arc::clone(&file_reader.props);
1727            let f = Arc::clone(&file_reader.chunk_reader);
1728            SerializedRowGroupReader::new(
1729                f,
1730                row_group_metadata,
1731                file_reader
1732                    .metadata
1733                    .offset_index()
1734                    .map(|x| x[row_group].as_slice()),
1735                props,
1736            )?
1737        };
1738
1739        let col = row_group.metadata.column(column);
1740
1741        let page_locations = row_group
1742            .offset_index
1743            .map(|x| x[column].page_locations.clone());
1744
1745        let props = Arc::clone(&row_group.props);
1746        SerializedPageReader::new_with_properties(
1747            Arc::clone(&row_group.chunk_reader),
1748            col,
1749            usize::try_from(row_group.metadata.num_rows())?,
1750            page_locations,
1751            props,
1752        )
1753    }
1754
1755    #[test]
1756    fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1757        let test_file = get_test_file("alltypes_plain.parquet");
1758        let reader = SerializedFileReader::new(test_file)?;
1759
1760        let mut offset_set = HashSet::new();
1761        let num_row_groups = reader.metadata.num_row_groups();
1762        for row_group in 0..num_row_groups {
1763            let num_columns = reader.metadata.row_group(row_group).num_columns();
1764            for column in 0..num_columns {
1765                let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1766
1767                while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1768                    match &page_reader.state {
1769                        SerializedPageReaderState::Pages {
1770                            page_locations,
1771                            dictionary_page,
1772                            ..
1773                        } => {
1774                            if let Some(page) = dictionary_page {
1775                                assert_eq!(page.offset as u64, page_offset);
1776                            } else if let Some(page) = page_locations.front() {
1777                                assert_eq!(page.offset as u64, page_offset);
1778                            } else {
1779                                unreachable!()
1780                            }
1781                        }
1782                        SerializedPageReaderState::Values {
1783                            offset,
1784                            next_page_header,
1785                            ..
1786                        } => {
1787                            assert!(next_page_header.is_some());
1788                            assert_eq!(*offset, page_offset);
1789                        }
1790                    }
1791                    let page = page_reader.get_next_page()?;
1792                    assert!(page.is_some());
1793                    let newly_inserted = offset_set.insert(page_offset);
1794                    assert!(newly_inserted);
1795                }
1796            }
1797        }
1798
1799        Ok(())
1800    }
1801
1802    #[test]
1803    fn test_page_iterator() {
1804        let file = get_test_file("alltypes_plain.parquet");
1805        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1806
1807        let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1808
1809        // read first page
1810        let page = page_iterator.next();
1811        assert!(page.is_some());
1812        assert!(page.unwrap().is_ok());
1813
1814        // reach end of file
1815        let page = page_iterator.next();
1816        assert!(page.is_none());
1817
1818        let row_group_indices = Box::new(0..1);
1819        let mut page_iterator =
1820            FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1821
1822        // read first page
1823        let page = page_iterator.next();
1824        assert!(page.is_some());
1825        assert!(page.unwrap().is_ok());
1826
1827        // reach end of file
1828        let page = page_iterator.next();
1829        assert!(page.is_none());
1830    }
1831
1832    #[test]
1833    fn test_file_reader_key_value_metadata() {
1834        let file = get_test_file("binary.parquet");
1835        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1836
1837        let metadata = file_reader
1838            .metadata
1839            .file_metadata()
1840            .key_value_metadata()
1841            .unwrap();
1842
1843        assert_eq!(metadata.len(), 3);
1844
1845        assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1846
1847        assert_eq!(metadata[1].key, "writer.model.name");
1848        assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1849
1850        assert_eq!(metadata[2].key, "parquet.proto.class");
1851        assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1852    }
1853
1854    #[test]
1855    fn test_file_reader_optional_metadata() {
1856        // file with optional metadata: bloom filters, encoding stats, column index and offset index.
1857        let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1858        let options = ReadOptionsBuilder::new()
1859            .with_encoding_stats_as_mask(false)
1860            .build();
1861        let file_reader = Arc::new(SerializedFileReader::new_with_options(file, options).unwrap());
1862
1863        let row_group_metadata = file_reader.metadata.row_group(0);
1864        let col0_metadata = row_group_metadata.column(0);
1865
1866        // test optional bloom filter offset
1867        assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1868
1869        // test page encoding stats
1870        let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1871
1872        assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1873        assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1874        assert_eq!(page_encoding_stats.count, 1);
1875
1876        // test optional column index offset
1877        assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1878        assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1879
1880        // test optional offset index offset
1881        assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1882        assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1883    }
1884
1885    #[test]
1886    fn test_file_reader_page_stats_mask() {
1887        let file = get_test_file("alltypes_tiny_pages.parquet");
1888        let options = ReadOptionsBuilder::new()
1889            .with_encoding_stats_as_mask(true)
1890            .build();
1891        let file_reader = Arc::new(SerializedFileReader::new_with_options(file, options).unwrap());
1892
1893        let row_group_metadata = file_reader.metadata.row_group(0);
1894
1895        // test page encoding stats
1896        let page_encoding_stats = row_group_metadata
1897            .column(0)
1898            .page_encoding_stats_mask()
1899            .unwrap();
1900        assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1901        let page_encoding_stats = row_group_metadata
1902            .column(2)
1903            .page_encoding_stats_mask()
1904            .unwrap();
1905        assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1906    }
1907
1908    #[test]
1909    fn test_file_reader_page_stats_skipped() {
1910        let file = get_test_file("alltypes_tiny_pages.parquet");
1911
1912        // test skipping all
1913        let options = ReadOptionsBuilder::new()
1914            .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1915            .build();
1916        let file_reader = Arc::new(
1917            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1918        );
1919
1920        let row_group_metadata = file_reader.metadata.row_group(0);
1921        for column in row_group_metadata.columns() {
1922            assert!(column.page_encoding_stats().is_none());
1923            assert!(column.page_encoding_stats_mask().is_none());
1924        }
1925
1926        // test skipping all but one column
1927        let options = ReadOptionsBuilder::new()
1928            .with_encoding_stats_as_mask(true)
1929            .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1930            .build();
1931        let file_reader = Arc::new(
1932            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1933        );
1934
1935        let row_group_metadata = file_reader.metadata.row_group(0);
1936        for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1937            assert!(column.page_encoding_stats().is_none());
1938            assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1939        }
1940    }
1941
1942    #[test]
1943    fn test_file_reader_with_no_filter() -> Result<()> {
1944        let test_file = get_test_file("alltypes_plain.parquet");
1945        let origin_reader = SerializedFileReader::new(test_file)?;
1946        // test initial number of row groups
1947        let metadata = origin_reader.metadata();
1948        assert_eq!(metadata.num_row_groups(), 1);
1949        Ok(())
1950    }
1951
1952    #[test]
1953    fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
1954        let test_file = get_test_file("alltypes_plain.parquet");
1955        let read_options = ReadOptionsBuilder::new()
1956            .with_predicate(Box::new(|_, _| false))
1957            .build();
1958        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1959        let metadata = reader.metadata();
1960        assert_eq!(metadata.num_row_groups(), 0);
1961        Ok(())
1962    }
1963
1964    #[test]
1965    fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
1966        let test_file = get_test_file("alltypes_plain.parquet");
1967        let origin_reader = SerializedFileReader::new(test_file)?;
1968        // test initial number of row groups
1969        let metadata = origin_reader.metadata();
1970        assert_eq!(metadata.num_row_groups(), 1);
1971        let mid = get_midpoint_offset(metadata.row_group(0));
1972
1973        let test_file = get_test_file("alltypes_plain.parquet");
1974        let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
1975        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1976        let metadata = reader.metadata();
1977        assert_eq!(metadata.num_row_groups(), 1);
1978
1979        let test_file = get_test_file("alltypes_plain.parquet");
1980        let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
1981        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1982        let metadata = reader.metadata();
1983        assert_eq!(metadata.num_row_groups(), 0);
1984        Ok(())
1985    }
1986
1987    #[test]
1988    fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
1989        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1990        let origin_reader = SerializedFileReader::new(test_file)?;
1991        let metadata = origin_reader.metadata();
1992        let mid = get_midpoint_offset(metadata.row_group(0));
1993
1994        // true, true predicate
1995        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1996        let read_options = ReadOptionsBuilder::new()
1997            .with_page_index()
1998            .with_predicate(Box::new(|_, _| true))
1999            .with_range(mid, mid + 1)
2000            .build();
2001        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2002        let metadata = reader.metadata();
2003        assert_eq!(metadata.num_row_groups(), 1);
2004        assert_eq!(metadata.column_index().unwrap().len(), 1);
2005        assert_eq!(metadata.offset_index().unwrap().len(), 1);
2006
2007        // true, false predicate
2008        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2009        let read_options = ReadOptionsBuilder::new()
2010            .with_page_index()
2011            .with_predicate(Box::new(|_, _| true))
2012            .with_range(0, mid)
2013            .build();
2014        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2015        let metadata = reader.metadata();
2016        assert_eq!(metadata.num_row_groups(), 0);
2017        assert!(metadata.column_index().is_none());
2018        assert!(metadata.offset_index().is_none());
2019
2020        // false, true predicate
2021        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2022        let read_options = ReadOptionsBuilder::new()
2023            .with_page_index()
2024            .with_predicate(Box::new(|_, _| false))
2025            .with_range(mid, mid + 1)
2026            .build();
2027        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2028        let metadata = reader.metadata();
2029        assert_eq!(metadata.num_row_groups(), 0);
2030        assert!(metadata.column_index().is_none());
2031        assert!(metadata.offset_index().is_none());
2032
2033        // false, false predicate
2034        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2035        let read_options = ReadOptionsBuilder::new()
2036            .with_page_index()
2037            .with_predicate(Box::new(|_, _| false))
2038            .with_range(0, mid)
2039            .build();
2040        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2041        let metadata = reader.metadata();
2042        assert_eq!(metadata.num_row_groups(), 0);
2043        assert!(metadata.column_index().is_none());
2044        assert!(metadata.offset_index().is_none());
2045        Ok(())
2046    }
2047
2048    #[test]
2049    fn test_file_reader_invalid_metadata() {
2050        let data = [
2051            255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
2052            80, 65, 82, 49,
2053        ];
2054        let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
2055        assert_eq!(
2056            ret.err().unwrap().to_string(),
2057            "Parquet error: Received empty union from remote ColumnOrder"
2058        );
2059    }
2060
2061    #[test]
2062    // Use java parquet-tools get below pageIndex info
2063    // !```
2064    // parquet-tools column-index ./data_index_bloom_encoding_stats.parquet
2065    // row group 0:
2066    // column index for column String:
2067    // Boundary order: ASCENDING
2068    // page-0  :
2069    // null count                 min                                  max
2070    // 0                          Hello                                today
2071    //
2072    // offset index for column String:
2073    // page-0   :
2074    // offset   compressed size       first row index
2075    // 4               152                     0
2076    ///```
2077    //
2078    fn test_page_index_reader() {
2079        let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
2080        let builder = ReadOptionsBuilder::new();
2081        //enable read page index
2082        let options = builder.with_page_index().build();
2083        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2084        let reader = reader_result.unwrap();
2085
2086        // Test contents in Parquet metadata
2087        let metadata = reader.metadata();
2088        assert_eq!(metadata.num_row_groups(), 1);
2089
2090        let column_index = metadata.column_index().unwrap();
2091
2092        // only one row group
2093        assert_eq!(column_index.len(), 1);
2094        let index = if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][0] {
2095            index
2096        } else {
2097            unreachable!()
2098        };
2099
2100        assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
2101
2102        //only one page group
2103        assert_eq!(index.num_pages(), 1);
2104
2105        let min = index.min_value(0).unwrap();
2106        let max = index.max_value(0).unwrap();
2107        assert_eq!(b"Hello", min.as_bytes());
2108        assert_eq!(b"today", max.as_bytes());
2109
2110        let offset_indexes = metadata.offset_index().unwrap();
2111        // only one row group
2112        assert_eq!(offset_indexes.len(), 1);
2113        let offset_index = &offset_indexes[0];
2114        let page_offset = &offset_index[0].page_locations()[0];
2115
2116        assert_eq!(4, page_offset.offset);
2117        assert_eq!(152, page_offset.compressed_page_size);
2118        assert_eq!(0, page_offset.first_row_index);
2119    }
2120
2121    #[test]
2122    #[allow(deprecated)]
2123    fn test_page_index_reader_out_of_order() {
2124        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2125        let options = ReadOptionsBuilder::new().with_page_index().build();
2126        let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
2127        let metadata = reader.metadata();
2128
2129        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2130        let columns = metadata.row_group(0).columns();
2131        let reversed: Vec<_> = columns.iter().cloned().rev().collect();
2132
2133        let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
2134        let mut b = read_columns_indexes(&test_file, &reversed)
2135            .unwrap()
2136            .unwrap();
2137        b.reverse();
2138        assert_eq!(a, b);
2139
2140        let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
2141        let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
2142        b.reverse();
2143        assert_eq!(a, b);
2144    }
2145
2146    #[test]
2147    fn test_page_index_reader_all_type() {
2148        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2149        let builder = ReadOptionsBuilder::new();
2150        //enable read page index
2151        let options = builder.with_page_index().build();
2152        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2153        let reader = reader_result.unwrap();
2154
2155        // Test contents in Parquet metadata
2156        let metadata = reader.metadata();
2157        assert_eq!(metadata.num_row_groups(), 1);
2158
2159        let column_index = metadata.column_index().unwrap();
2160        let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
2161
2162        // only one row group
2163        assert_eq!(column_index.len(), 1);
2164        let row_group_metadata = metadata.row_group(0);
2165
2166        //col0->id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 7299, num_nulls: 0]
2167        assert!(!&column_index[0][0].is_sorted());
2168        let boundary_order = &column_index[0][0].get_boundary_order();
2169        assert!(boundary_order.is_some());
2170        matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
2171        if let ColumnIndexMetaData::INT32(index) = &column_index[0][0] {
2172            check_native_page_index(
2173                index,
2174                325,
2175                get_row_group_min_max_bytes(row_group_metadata, 0),
2176                BoundaryOrder::UNORDERED,
2177            );
2178            assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
2179        } else {
2180            unreachable!()
2181        };
2182        //col1->bool_col:BOOLEAN UNCOMPRESSED DO:0 FPO:37329 SZ:3022/3022/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: false, max: true, num_nulls: 0]
2183        assert!(&column_index[0][1].is_sorted());
2184        if let ColumnIndexMetaData::BOOLEAN(index) = &column_index[0][1] {
2185            assert_eq!(index.num_pages(), 82);
2186            assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
2187        } else {
2188            unreachable!()
2189        };
2190        //col2->tinyint_col: INT32 UNCOMPRESSED DO:0 FPO:40351 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2191        assert!(&column_index[0][2].is_sorted());
2192        if let ColumnIndexMetaData::INT32(index) = &column_index[0][2] {
2193            check_native_page_index(
2194                index,
2195                325,
2196                get_row_group_min_max_bytes(row_group_metadata, 2),
2197                BoundaryOrder::ASCENDING,
2198            );
2199            assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
2200        } else {
2201            unreachable!()
2202        };
2203        //col4->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2204        assert!(&column_index[0][3].is_sorted());
2205        if let ColumnIndexMetaData::INT32(index) = &column_index[0][3] {
2206            check_native_page_index(
2207                index,
2208                325,
2209                get_row_group_min_max_bytes(row_group_metadata, 3),
2210                BoundaryOrder::ASCENDING,
2211            );
2212            assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
2213        } else {
2214            unreachable!()
2215        };
2216        //col5->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2217        assert!(&column_index[0][4].is_sorted());
2218        if let ColumnIndexMetaData::INT32(index) = &column_index[0][4] {
2219            check_native_page_index(
2220                index,
2221                325,
2222                get_row_group_min_max_bytes(row_group_metadata, 4),
2223                BoundaryOrder::ASCENDING,
2224            );
2225            assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
2226        } else {
2227            unreachable!()
2228        };
2229        //col6->bigint_col: INT64 UNCOMPRESSED DO:0 FPO:152326 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 90, num_nulls: 0]
2230        assert!(!&column_index[0][5].is_sorted());
2231        if let ColumnIndexMetaData::INT64(index) = &column_index[0][5] {
2232            check_native_page_index(
2233                index,
2234                528,
2235                get_row_group_min_max_bytes(row_group_metadata, 5),
2236                BoundaryOrder::UNORDERED,
2237            );
2238            assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
2239        } else {
2240            unreachable!()
2241        };
2242        //col7->float_col: FLOAT UNCOMPRESSED DO:0 FPO:223924 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 9.9, num_nulls: 0]
2243        assert!(&column_index[0][6].is_sorted());
2244        if let ColumnIndexMetaData::FLOAT(index) = &column_index[0][6] {
2245            check_native_page_index(
2246                index,
2247                325,
2248                get_row_group_min_max_bytes(row_group_metadata, 6),
2249                BoundaryOrder::ASCENDING,
2250            );
2251            assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
2252        } else {
2253            unreachable!()
2254        };
2255        //col8->double_col: DOUBLE UNCOMPRESSED DO:0 FPO:261249 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 90.89999999999999, num_nulls: 0]
2256        assert!(!&column_index[0][7].is_sorted());
2257        if let ColumnIndexMetaData::DOUBLE(index) = &column_index[0][7] {
2258            check_native_page_index(
2259                index,
2260                528,
2261                get_row_group_min_max_bytes(row_group_metadata, 7),
2262                BoundaryOrder::UNORDERED,
2263            );
2264            assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
2265        } else {
2266            unreachable!()
2267        };
2268        //col9->date_string_col: BINARY UNCOMPRESSED DO:0 FPO:332847 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 01/01/09, max: 12/31/10, num_nulls: 0]
2269        assert!(!&column_index[0][8].is_sorted());
2270        if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][8] {
2271            check_byte_array_page_index(
2272                index,
2273                974,
2274                get_row_group_min_max_bytes(row_group_metadata, 8),
2275                BoundaryOrder::UNORDERED,
2276            );
2277            assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
2278        } else {
2279            unreachable!()
2280        };
2281        //col10->string_col: BINARY UNCOMPRESSED DO:0 FPO:444795 SZ:45298/45298/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2282        assert!(&column_index[0][9].is_sorted());
2283        if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][9] {
2284            check_byte_array_page_index(
2285                index,
2286                352,
2287                get_row_group_min_max_bytes(row_group_metadata, 9),
2288                BoundaryOrder::ASCENDING,
2289            );
2290            assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
2291        } else {
2292            unreachable!()
2293        };
2294        //col11->timestamp_col: INT96 UNCOMPRESSED DO:0 FPO:490093 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[num_nulls: 0, min/max not defined]
2295        //Notice: min_max values for each page for this col not exits.
2296        assert!(!&column_index[0][10].is_sorted());
2297        if let ColumnIndexMetaData::NONE = &column_index[0][10] {
2298            assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
2299        } else {
2300            unreachable!()
2301        };
2302        //col12->year: INT32 UNCOMPRESSED DO:0 FPO:602041 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2009, max: 2010, num_nulls: 0]
2303        assert!(&column_index[0][11].is_sorted());
2304        if let ColumnIndexMetaData::INT32(index) = &column_index[0][11] {
2305            check_native_page_index(
2306                index,
2307                325,
2308                get_row_group_min_max_bytes(row_group_metadata, 11),
2309                BoundaryOrder::ASCENDING,
2310            );
2311            assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
2312        } else {
2313            unreachable!()
2314        };
2315        //col13->month: INT32 UNCOMPRESSED DO:0 FPO:639366 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 12, num_nulls: 0]
2316        assert!(!&column_index[0][12].is_sorted());
2317        if let ColumnIndexMetaData::INT32(index) = &column_index[0][12] {
2318            check_native_page_index(
2319                index,
2320                325,
2321                get_row_group_min_max_bytes(row_group_metadata, 12),
2322                BoundaryOrder::UNORDERED,
2323            );
2324            assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
2325        } else {
2326            unreachable!()
2327        };
2328    }
2329
2330    fn check_native_page_index<T: ParquetValueType>(
2331        row_group_index: &PrimitiveColumnIndex<T>,
2332        page_size: usize,
2333        min_max: (&[u8], &[u8]),
2334        boundary_order: BoundaryOrder,
2335    ) {
2336        assert_eq!(row_group_index.num_pages() as usize, page_size);
2337        assert_eq!(row_group_index.boundary_order, boundary_order);
2338        assert!(row_group_index.min_values().iter().all(|x| {
2339            x >= &T::try_from_le_slice(min_max.0).unwrap()
2340                && x <= &T::try_from_le_slice(min_max.1).unwrap()
2341        }));
2342    }
2343
2344    fn check_byte_array_page_index(
2345        row_group_index: &ByteArrayColumnIndex,
2346        page_size: usize,
2347        min_max: (&[u8], &[u8]),
2348        boundary_order: BoundaryOrder,
2349    ) {
2350        assert_eq!(row_group_index.num_pages() as usize, page_size);
2351        assert_eq!(row_group_index.boundary_order, boundary_order);
2352        for i in 0..row_group_index.num_pages() as usize {
2353            let x = row_group_index.min_value(i).unwrap();
2354            assert!(x >= min_max.0 && x <= min_max.1);
2355        }
2356    }
2357
2358    fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
2359        let statistics = r.column(col_num).statistics().unwrap();
2360        (
2361            statistics.min_bytes_opt().unwrap_or_default(),
2362            statistics.max_bytes_opt().unwrap_or_default(),
2363        )
2364    }
2365
2366    #[test]
2367    fn test_skip_next_page_with_dictionary_page() {
2368        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2369        let builder = ReadOptionsBuilder::new();
2370        // enable read page index
2371        let options = builder.with_page_index().build();
2372        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2373        let reader = reader_result.unwrap();
2374
2375        let row_group_reader = reader.get_row_group(0).unwrap();
2376
2377        // use 'string_col', Boundary order: UNORDERED, total 352 data pages and 1 dictionary page.
2378        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2379
2380        let mut vec = vec![];
2381
2382        // Step 1: Peek and ensure dictionary page is correctly identified
2383        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2384        assert!(meta.is_dict);
2385
2386        // Step 2: Call skip_next_page to skip the dictionary page
2387        column_page_reader.skip_next_page().unwrap();
2388
2389        // Step 3: Read the next data page after skipping the dictionary page
2390        let page = column_page_reader.get_next_page().unwrap().unwrap();
2391        assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2392
2393        // Step 4: Continue reading remaining data pages and verify correctness
2394        for _i in 0..351 {
2395            // 352 total pages, 1 dictionary page is skipped
2396            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2397            assert!(!meta.is_dict); // Verify no dictionary page here
2398            vec.push(meta);
2399
2400            let page = column_page_reader.get_next_page().unwrap().unwrap();
2401            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2402        }
2403
2404        // Step 5: Check if all pages are read
2405        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2406        assert!(column_page_reader.get_next_page().unwrap().is_none());
2407
2408        // Step 6: Verify the number of data pages read (should be 351 data pages)
2409        assert_eq!(vec.len(), 351);
2410    }
2411
2412    #[test]
2413    fn test_skip_page_with_offset_index() {
2414        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2415        let builder = ReadOptionsBuilder::new();
2416        //enable read page index
2417        let options = builder.with_page_index().build();
2418        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2419        let reader = reader_result.unwrap();
2420
2421        let row_group_reader = reader.get_row_group(0).unwrap();
2422
2423        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
2424        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2425
2426        let mut vec = vec![];
2427
2428        for i in 0..325 {
2429            if i % 2 == 0 {
2430                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2431            } else {
2432                column_page_reader.skip_next_page().unwrap();
2433            }
2434        }
2435        //check read all pages.
2436        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2437        assert!(column_page_reader.get_next_page().unwrap().is_none());
2438
2439        assert_eq!(vec.len(), 163);
2440    }
2441
2442    #[test]
2443    fn test_skip_page_without_offset_index() {
2444        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2445
2446        // use default SerializedFileReader without read offsetIndex
2447        let reader_result = SerializedFileReader::new(test_file);
2448        let reader = reader_result.unwrap();
2449
2450        let row_group_reader = reader.get_row_group(0).unwrap();
2451
2452        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
2453        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2454
2455        let mut vec = vec![];
2456
2457        for i in 0..325 {
2458            if i % 2 == 0 {
2459                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2460            } else {
2461                column_page_reader.peek_next_page().unwrap().unwrap();
2462                column_page_reader.skip_next_page().unwrap();
2463            }
2464        }
2465        //check read all pages.
2466        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2467        assert!(column_page_reader.get_next_page().unwrap().is_none());
2468
2469        assert_eq!(vec.len(), 163);
2470    }
2471
2472    #[test]
2473    fn test_peek_page_with_dictionary_page() {
2474        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2475        let builder = ReadOptionsBuilder::new();
2476        //enable read page index
2477        let options = builder.with_page_index().build();
2478        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2479        let reader = reader_result.unwrap();
2480        let row_group_reader = reader.get_row_group(0).unwrap();
2481
2482        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
2483        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2484
2485        let mut vec = vec![];
2486
2487        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2488        assert!(meta.is_dict);
2489        let page = column_page_reader.get_next_page().unwrap().unwrap();
2490        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2491
2492        for i in 0..352 {
2493            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2494            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
2495            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
2496            if i != 351 {
2497                assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
2498            } else {
2499                // last page first row index is 7290, total row count is 7300
2500                // because first row start with zero, last page row count should be 10.
2501                assert_eq!(meta.num_rows, Some(10));
2502            }
2503            assert!(!meta.is_dict);
2504            vec.push(meta);
2505            let page = column_page_reader.get_next_page().unwrap().unwrap();
2506            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2507        }
2508
2509        //check read all pages.
2510        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2511        assert!(column_page_reader.get_next_page().unwrap().is_none());
2512
2513        assert_eq!(vec.len(), 352);
2514    }
2515
2516    #[test]
2517    fn test_peek_page_with_dictionary_page_without_offset_index() {
2518        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2519
2520        let reader_result = SerializedFileReader::new(test_file);
2521        let reader = reader_result.unwrap();
2522        let row_group_reader = reader.get_row_group(0).unwrap();
2523
2524        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
2525        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2526
2527        let mut vec = vec![];
2528
2529        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2530        assert!(meta.is_dict);
2531        let page = column_page_reader.get_next_page().unwrap().unwrap();
2532        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2533
2534        for i in 0..352 {
2535            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2536            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
2537            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
2538            if i != 351 {
2539                assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2540            } else {
2541                // last page first row index is 7290, total row count is 7300
2542                // because first row start with zero, last page row count should be 10.
2543                assert_eq!(meta.num_levels, Some(10));
2544            }
2545            assert!(!meta.is_dict);
2546            vec.push(meta);
2547            let page = column_page_reader.get_next_page().unwrap().unwrap();
2548            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2549        }
2550
2551        //check read all pages.
2552        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2553        assert!(column_page_reader.get_next_page().unwrap().is_none());
2554
2555        assert_eq!(vec.len(), 352);
2556    }
2557
2558    #[test]
2559    fn test_fixed_length_index() {
2560        let message_type = "
2561        message test_schema {
2562          OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2563        }
2564        ";
2565
2566        let schema = parse_message_type(message_type).unwrap();
2567        let mut out = Vec::with_capacity(1024);
2568        let mut writer =
2569            SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2570
2571        let mut r = writer.next_row_group().unwrap();
2572        let mut c = r.next_column().unwrap().unwrap();
2573        c.typed::<FixedLenByteArrayType>()
2574            .write_batch(
2575                &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2576                Some(&[1, 1, 0, 1]),
2577                None,
2578            )
2579            .unwrap();
2580        c.close().unwrap();
2581        r.close().unwrap();
2582        writer.close().unwrap();
2583
2584        let b = Bytes::from(out);
2585        let options = ReadOptionsBuilder::new().with_page_index().build();
2586        let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2587        let index = reader.metadata().column_index().unwrap();
2588
2589        // 1 row group
2590        assert_eq!(index.len(), 1);
2591        let c = &index[0];
2592        // 1 column
2593        assert_eq!(c.len(), 1);
2594
2595        match &c[0] {
2596            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(v) => {
2597                assert_eq!(v.num_pages(), 1);
2598                assert_eq!(v.null_count(0).unwrap(), 1);
2599                assert_eq!(v.min_value(0).unwrap(), &[0; 11]);
2600                assert_eq!(v.max_value(0).unwrap(), &[5; 11]);
2601            }
2602            _ => unreachable!(),
2603        }
2604    }
2605
2606    #[test]
2607    fn test_multi_gz() {
2608        let file = get_test_file("concatenated_gzip_members.parquet");
2609        let reader = SerializedFileReader::new(file).unwrap();
2610        let row_group_reader = reader.get_row_group(0).unwrap();
2611        match row_group_reader.get_column_reader(0).unwrap() {
2612            ColumnReader::Int64ColumnReader(mut reader) => {
2613                let mut buffer = Vec::with_capacity(1024);
2614                let mut def_levels = Vec::with_capacity(1024);
2615                let (num_records, num_values, num_levels) = reader
2616                    .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2617                    .unwrap();
2618
2619                assert_eq!(num_records, 513);
2620                assert_eq!(num_values, 513);
2621                assert_eq!(num_levels, 513);
2622
2623                let expected: Vec<i64> = (1..514).collect();
2624                assert_eq!(&buffer, &expected);
2625            }
2626            _ => unreachable!(),
2627        }
2628    }
2629
2630    #[test]
2631    fn test_byte_stream_split_extended() {
2632        let path = format!(
2633            "{}/byte_stream_split_extended.gzip.parquet",
2634            arrow::util::test_util::parquet_test_data(),
2635        );
2636        let file = File::open(path).unwrap();
2637        let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2638
2639        // Use full schema as projected schema
2640        let mut iter = reader
2641            .get_row_iter(None)
2642            .expect("Failed to create row iterator");
2643
2644        let mut start = 0;
2645        let end = reader.metadata().file_metadata().num_rows();
2646
2647        let check_row = |row: Result<Row, ParquetError>| {
2648            assert!(row.is_ok());
2649            let r = row.unwrap();
2650            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2651            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2652            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2653            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2654            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2655            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2656            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2657        };
2658
2659        while start < end {
2660            match iter.next() {
2661                Some(row) => check_row(row),
2662                None => break,
2663            };
2664            start += 1;
2665        }
2666    }
2667
2668    #[test]
2669    fn test_filtered_rowgroup_metadata() {
2670        let message_type = "
2671            message test_schema {
2672                REQUIRED INT32 a;
2673            }
2674        ";
2675        let schema = Arc::new(parse_message_type(message_type).unwrap());
2676        let props = Arc::new(
2677            WriterProperties::builder()
2678                .set_statistics_enabled(EnabledStatistics::Page)
2679                .build(),
2680        );
2681        let mut file: File = tempfile::tempfile().unwrap();
2682        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2683        let data = [1, 2, 3, 4, 5];
2684
2685        // write 5 row groups
2686        for idx in 0..5 {
2687            let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2688            let mut row_group_writer = file_writer.next_row_group().unwrap();
2689            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2690                writer
2691                    .typed::<Int32Type>()
2692                    .write_batch(data_i.as_slice(), None, None)
2693                    .unwrap();
2694                writer.close().unwrap();
2695            }
2696            row_group_writer.close().unwrap();
2697            file_writer.flushed_row_groups();
2698        }
2699        let file_metadata = file_writer.close().unwrap();
2700
2701        assert_eq!(file_metadata.file_metadata().num_rows(), 25);
2702        assert_eq!(file_metadata.num_row_groups(), 5);
2703
2704        // read only the 3rd row group
2705        let read_options = ReadOptionsBuilder::new()
2706            .with_page_index()
2707            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2708            .build();
2709        let reader =
2710            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2711                .unwrap();
2712        let metadata = reader.metadata();
2713
2714        // check we got the expected row group
2715        assert_eq!(metadata.num_row_groups(), 1);
2716        assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2717
2718        // check we only got the relevant page indexes
2719        assert!(metadata.column_index().is_some());
2720        assert!(metadata.offset_index().is_some());
2721        assert_eq!(metadata.column_index().unwrap().len(), 1);
2722        assert_eq!(metadata.offset_index().unwrap().len(), 1);
2723        let col_idx = metadata.column_index().unwrap();
2724        let off_idx = metadata.offset_index().unwrap();
2725        let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2726        let pg_idx = &col_idx[0][0];
2727        let off_idx_i = &off_idx[0][0];
2728
2729        // test that we got the index matching the row group
2730        match pg_idx {
2731            ColumnIndexMetaData::INT32(int_idx) => {
2732                let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2733                let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2734                assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2735                assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2736            }
2737            _ => panic!("wrong stats type"),
2738        }
2739
2740        // check offset index matches too
2741        assert_eq!(
2742            off_idx_i.page_locations[0].offset,
2743            metadata.row_group(0).column(0).data_page_offset()
2744        );
2745
2746        // read non-contiguous row groups
2747        let read_options = ReadOptionsBuilder::new()
2748            .with_page_index()
2749            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2750            .build();
2751        let reader =
2752            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2753                .unwrap();
2754        let metadata = reader.metadata();
2755
2756        // check we got the expected row groups
2757        assert_eq!(metadata.num_row_groups(), 2);
2758        assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2759        assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2760
2761        // check we only got the relevant page indexes
2762        assert!(metadata.column_index().is_some());
2763        assert!(metadata.offset_index().is_some());
2764        assert_eq!(metadata.column_index().unwrap().len(), 2);
2765        assert_eq!(metadata.offset_index().unwrap().len(), 2);
2766        let col_idx = metadata.column_index().unwrap();
2767        let off_idx = metadata.offset_index().unwrap();
2768
2769        for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2770            let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2771            let pg_idx = &col_idx_i[0];
2772            let off_idx_i = &off_idx[i][0];
2773
2774            // test that we got the index matching the row group
2775            match pg_idx {
2776                ColumnIndexMetaData::INT32(int_idx) => {
2777                    let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2778                    let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2779                    assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2780                    assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2781                }
2782                _ => panic!("wrong stats type"),
2783            }
2784
2785            // check offset index matches too
2786            assert_eq!(
2787                off_idx_i.page_locations[0].offset,
2788                metadata.row_group(i).column(0).data_page_offset()
2789            );
2790        }
2791    }
2792
2793    #[test]
2794    fn test_reuse_schema() {
2795        let file = get_test_file("alltypes_plain.parquet");
2796        let file_reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
2797        let schema = file_reader.metadata().file_metadata().schema_descr_ptr();
2798        let expected = file_reader.metadata;
2799
2800        let options = ReadOptionsBuilder::new()
2801            .with_parquet_schema(schema)
2802            .build();
2803        let file_reader = SerializedFileReader::new_with_options(file, options).unwrap();
2804
2805        assert_eq!(expected.as_ref(), file_reader.metadata.as_ref());
2806        // Should have used the same schema instance
2807        assert!(Arc::ptr_eq(
2808            &expected.file_metadata().schema_descr_ptr(),
2809            &file_reader.metadata.file_metadata().schema_descr_ptr()
2810        ));
2811    }
2812
2813    #[test]
2814    fn test_read_unknown_logical_type() {
2815        let file = get_test_file("unknown-logical-type.parquet");
2816        let reader = SerializedFileReader::new(file).expect("Error opening file");
2817
2818        let schema = reader.metadata().file_metadata().schema_descr();
2819        assert_eq!(
2820            schema.column(0).logical_type_ref(),
2821            Some(&basic::LogicalType::String)
2822        );
2823        assert_eq!(
2824            schema.column(1).logical_type_ref(),
2825            Some(&basic::LogicalType::_Unknown { field_id: 2555 })
2826        );
2827        assert_eq!(schema.column(1).physical_type(), Type::BYTE_ARRAY);
2828
2829        let mut iter = reader
2830            .get_row_iter(None)
2831            .expect("Failed to create row iterator");
2832
2833        let mut num_rows = 0;
2834        while iter.next().is_some() {
2835            num_rows += 1;
2836        }
2837        assert_eq!(num_rows, reader.metadata().file_metadata().num_rows());
2838    }
2839}