Skip to main content

parquet/file/
serialized_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader
19//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM)
20
21use crate::basic::{PageType, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{Codec, create_codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{CryptoContext, read_and_decrypt};
27use crate::errors::{ParquetError, Result};
28use crate::file::metadata::thrift::PageHeader;
29use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
30use crate::file::statistics;
31use crate::file::{
32    metadata::*,
33    properties::{ReaderProperties, ReaderPropertiesPtr},
34    reader::*,
35};
36#[cfg(feature = "encryption")]
37use crate::parquet_thrift::ThriftSliceInputProtocol;
38use crate::parquet_thrift::{ReadThrift, ThriftReadInputProtocol};
39use crate::record::Row;
40use crate::record::reader::RowIter;
41use crate::schema::types::{SchemaDescPtr, Type as SchemaType};
42use bytes::Bytes;
43use std::collections::VecDeque;
44use std::{fs::File, io::Read, path::Path, sync::Arc};
45
46impl TryFrom<File> for SerializedFileReader<File> {
47    type Error = ParquetError;
48
49    fn try_from(file: File) -> Result<Self> {
50        Self::new(file)
51    }
52}
53
54impl TryFrom<&Path> for SerializedFileReader<File> {
55    type Error = ParquetError;
56
57    fn try_from(path: &Path) -> Result<Self> {
58        let file = File::open(path)?;
59        Self::try_from(file)
60    }
61}
62
63impl TryFrom<String> for SerializedFileReader<File> {
64    type Error = ParquetError;
65
66    fn try_from(path: String) -> Result<Self> {
67        Self::try_from(Path::new(&path))
68    }
69}
70
71impl TryFrom<&str> for SerializedFileReader<File> {
72    type Error = ParquetError;
73
74    fn try_from(path: &str) -> Result<Self> {
75        Self::try_from(Path::new(&path))
76    }
77}
78
79/// Conversion into a [`RowIter`]
80/// using the full file schema over all row groups.
81impl IntoIterator for SerializedFileReader<File> {
82    type Item = Result<Row>;
83    type IntoIter = RowIter<'static>;
84
85    fn into_iter(self) -> Self::IntoIter {
86        RowIter::from_file_into(Box::new(self))
87    }
88}
89
90// ----------------------------------------------------------------------
91// Implementations of file & row group readers
92
93/// A serialized implementation for Parquet [`FileReader`].
94pub struct SerializedFileReader<R: ChunkReader> {
95    chunk_reader: Arc<R>,
96    metadata: Arc<ParquetMetaData>,
97    props: ReaderPropertiesPtr,
98}
99
100/// A predicate for filtering row groups, invoked with the metadata and index
101/// of each row group in the file. Only row groups for which the predicate
102/// evaluates to `true` will be scanned
103pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
104
105/// A builder for [`ReadOptions`].
106/// For the predicates that are added to the builder,
107/// they will be chained using 'AND' to filter the row groups.
108#[derive(Default)]
109pub struct ReadOptionsBuilder {
110    predicates: Vec<ReadGroupPredicate>,
111    enable_page_index: bool,
112    props: Option<ReaderProperties>,
113    metadata_options: ParquetMetaDataOptions,
114}
115
116impl ReadOptionsBuilder {
117    /// New builder
118    pub fn new() -> Self {
119        Self::default()
120    }
121
122    /// Add a predicate on row group metadata to the reading option,
123    /// Filter only row groups that match the predicate criteria
124    pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
125        self.predicates.push(predicate);
126        self
127    }
128
129    /// Add a range predicate on filtering row groups if their midpoints are within
130    /// the Closed-Open range `[start..end) {x | start <= x < end}`
131    pub fn with_range(mut self, start: i64, end: i64) -> Self {
132        assert!(start < end);
133        let predicate = move |rg: &RowGroupMetaData, _: usize| {
134            let mid = get_midpoint_offset(rg);
135            mid >= start && mid < end
136        };
137        self.predicates.push(Box::new(predicate));
138        self
139    }
140
141    /// Enable reading the page index structures described in
142    /// "[Column Index] Layout to Support Page Skipping"
143    ///
144    /// [Column Index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
145    pub fn with_page_index(mut self) -> Self {
146        self.enable_page_index = true;
147        self
148    }
149
150    /// Set the [`ReaderProperties`] configuration.
151    pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
152        self.props = Some(properties);
153        self
154    }
155
156    /// Provide a Parquet schema to use when decoding the metadata. The schema in the Parquet
157    /// footer will be skipped.
158    pub fn with_parquet_schema(mut self, schema: SchemaDescPtr) -> Self {
159        self.metadata_options.set_schema(schema);
160        self
161    }
162
163    /// Set whether to convert the [`encoding_stats`] in the Parquet `ColumnMetaData` to a bitmask
164    /// (defaults to `false`).
165    ///
166    /// See [`ColumnChunkMetaData::page_encoding_stats_mask`] for an explanation of why this
167    /// might be desirable.
168    ///
169    /// [`encoding_stats`]:
170    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
171    pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
172        self.metadata_options.set_encoding_stats_as_mask(val);
173        self
174    }
175
176    /// Sets the decoding policy for [`encoding_stats`] in the Parquet `ColumnMetaData`.
177    ///
178    /// [`encoding_stats`]:
179    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
180    pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
181        self.metadata_options.set_encoding_stats_policy(policy);
182        self
183    }
184
185    /// Sets the decoding policy for [`statistics`] in the Parquet `ColumnMetaData`.
186    ///
187    /// [`statistics`]:
188    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L912
189    pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
190        self.metadata_options.set_column_stats_policy(policy);
191        self
192    }
193
194    /// Sets the decoding policy for [`size_statistics`] in the Parquet `ColumnMetaData`.
195    ///
196    /// [`size_statistics`]:
197    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L936
198    pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
199        self.metadata_options.set_size_stats_policy(policy);
200        self
201    }
202
203    /// Seal the builder and return the read options
204    pub fn build(self) -> ReadOptions {
205        let props = self
206            .props
207            .unwrap_or_else(|| ReaderProperties::builder().build());
208        ReadOptions {
209            predicates: self.predicates,
210            enable_page_index: self.enable_page_index,
211            props,
212            metadata_options: self.metadata_options,
213        }
214    }
215}
216
217/// A collection of options for reading a Parquet file.
218///
219/// Predicates are currently only supported on row group metadata.
220/// All predicates will be chained using 'AND' to filter the row groups.
221pub struct ReadOptions {
222    predicates: Vec<ReadGroupPredicate>,
223    enable_page_index: bool,
224    props: ReaderProperties,
225    metadata_options: ParquetMetaDataOptions,
226}
227
228impl<R: 'static + ChunkReader> SerializedFileReader<R> {
229    /// Creates file reader from a Parquet file.
230    /// Returns an error if the Parquet file does not exist or is corrupt.
231    pub fn new(chunk_reader: R) -> Result<Self> {
232        let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
233        let props = Arc::new(ReaderProperties::builder().build());
234        Ok(Self {
235            chunk_reader: Arc::new(chunk_reader),
236            metadata: Arc::new(metadata),
237            props,
238        })
239    }
240
241    /// Creates file reader from a Parquet file with read options.
242    /// Returns an error if the Parquet file does not exist or is corrupt.
243    pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
244        let mut metadata_builder = ParquetMetaDataReader::new()
245            .with_metadata_options(Some(options.metadata_options.clone()))
246            .parse_and_finish(&chunk_reader)?
247            .into_builder();
248        let mut predicates = options.predicates;
249
250        // Filter row groups based on the predicates
251        for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
252            let mut keep = true;
253            for predicate in &mut predicates {
254                if !predicate(&rg_meta, i) {
255                    keep = false;
256                    break;
257                }
258            }
259            if keep {
260                metadata_builder = metadata_builder.add_row_group(rg_meta);
261            }
262        }
263
264        let mut metadata = metadata_builder.build();
265
266        // If page indexes are desired, build them with the filtered set of row groups
267        if options.enable_page_index {
268            let mut reader = ParquetMetaDataReader::new_with_metadata(metadata)
269                .with_page_index_policy(PageIndexPolicy::Required);
270            reader.read_page_indexes(&chunk_reader)?;
271            metadata = reader.finish()?;
272        }
273
274        Ok(Self {
275            chunk_reader: Arc::new(chunk_reader),
276            metadata: Arc::new(metadata),
277            props: Arc::new(options.props),
278        })
279    }
280}
281
282/// Get midpoint offset for a row group
283fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
284    let col = meta.column(0);
285    let mut offset = col.data_page_offset();
286    if let Some(dic_offset) = col.dictionary_page_offset() {
287        if offset > dic_offset {
288            offset = dic_offset
289        }
290    };
291    offset + meta.compressed_size() / 2
292}
293
294impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
295    fn metadata(&self) -> &ParquetMetaData {
296        &self.metadata
297    }
298
299    fn num_row_groups(&self) -> usize {
300        self.metadata.num_row_groups()
301    }
302
303    fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
304        let row_group_metadata = self.metadata.row_group(i);
305        // Row groups should be processed sequentially.
306        let props = Arc::clone(&self.props);
307        let f = Arc::clone(&self.chunk_reader);
308        Ok(Box::new(SerializedRowGroupReader::new(
309            f,
310            row_group_metadata,
311            self.metadata.offset_index().map(|x| x[i].as_slice()),
312            props,
313        )?))
314    }
315
316    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
317        RowIter::from_file(projection, self)
318    }
319}
320
321/// A serialized implementation for Parquet [`RowGroupReader`].
322pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
323    chunk_reader: Arc<R>,
324    metadata: &'a RowGroupMetaData,
325    offset_index: Option<&'a [OffsetIndexMetaData]>,
326    props: ReaderPropertiesPtr,
327    bloom_filters: Vec<Option<Sbbf>>,
328}
329
330impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
331    /// Creates new row group reader from a file, row group metadata and custom config.
332    pub fn new(
333        chunk_reader: Arc<R>,
334        metadata: &'a RowGroupMetaData,
335        offset_index: Option<&'a [OffsetIndexMetaData]>,
336        props: ReaderPropertiesPtr,
337    ) -> Result<Self> {
338        let bloom_filters = if props.read_bloom_filter() {
339            metadata
340                .columns()
341                .iter()
342                .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
343                .collect::<Result<Vec<_>>>()?
344        } else {
345            std::iter::repeat_n(None, metadata.columns().len()).collect()
346        };
347        Ok(Self {
348            chunk_reader,
349            metadata,
350            offset_index,
351            props,
352            bloom_filters,
353        })
354    }
355}
356
357impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
358    fn metadata(&self) -> &RowGroupMetaData {
359        self.metadata
360    }
361
362    fn num_columns(&self) -> usize {
363        self.metadata.num_columns()
364    }
365
366    // TODO: fix PARQUET-816
367    fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
368        let col = self.metadata.column(i);
369
370        let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
371
372        let props = Arc::clone(&self.props);
373        Ok(Box::new(SerializedPageReader::new_with_properties(
374            Arc::clone(&self.chunk_reader),
375            col,
376            usize::try_from(self.metadata.num_rows())?,
377            page_locations,
378            props,
379        )?))
380    }
381
382    /// get bloom filter for the `i`th column
383    fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
384        self.bloom_filters[i].as_ref()
385    }
386
387    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
388        RowIter::from_row_group(projection, self)
389    }
390}
391
392/// Decodes a [`Page`] from the provided `buffer`
393pub(crate) fn decode_page(
394    page_header: PageHeader,
395    buffer: Bytes,
396    physical_type: Type,
397    decompressor: Option<&mut Box<dyn Codec>>,
398) -> Result<Page> {
399    // Verify the 32-bit CRC checksum of the page
400    #[cfg(feature = "crc")]
401    if let Some(expected_crc) = page_header.crc {
402        let crc = crc32fast::hash(&buffer);
403        if crc != expected_crc as u32 {
404            return Err(general_err!("Page CRC checksum mismatch"));
405        }
406    }
407
408    // When processing data page v2, depending on enabled compression for the
409    // page, we should account for uncompressed data ('offset') of
410    // repetition and definition levels.
411    //
412    // We always use 0 offset for other pages other than v2, `true` flag means
413    // that compression will be applied if decompressor is defined
414    let mut offset: usize = 0;
415    let mut can_decompress = true;
416
417    if let Some(ref header_v2) = page_header.data_page_header_v2 {
418        if header_v2.definition_levels_byte_length < 0
419            || header_v2.repetition_levels_byte_length < 0
420            || header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
421                > page_header.uncompressed_page_size
422        {
423            return Err(general_err!(
424                "DataPage v2 header contains implausible values \
425                    for definition_levels_byte_length ({}) \
426                    and repetition_levels_byte_length ({}) \
427                    given DataPage header provides uncompressed_page_size ({})",
428                header_v2.definition_levels_byte_length,
429                header_v2.repetition_levels_byte_length,
430                page_header.uncompressed_page_size
431            ));
432        }
433        offset = usize::try_from(
434            header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
435        )?;
436        // When is_compressed flag is missing the page is considered compressed
437        can_decompress = header_v2.is_compressed.unwrap_or(true);
438    }
439
440    let buffer = match decompressor {
441        Some(decompressor) if can_decompress => {
442            let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
443            if offset > buffer.len() || offset > uncompressed_page_size {
444                return Err(general_err!("Invalid page header"));
445            }
446            let decompressed_size = uncompressed_page_size - offset;
447            let mut decompressed = Vec::with_capacity(uncompressed_page_size);
448            decompressed.extend_from_slice(&buffer[..offset]);
449            // decompressed size of zero corresponds to a page with no non-null values
450            // see https://github.com/apache/parquet-format/blob/master/README.md#data-pages
451            if decompressed_size > 0 {
452                let compressed = &buffer[offset..];
453                decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
454            }
455
456            if decompressed.len() != uncompressed_page_size {
457                return Err(general_err!(
458                    "Actual decompressed size doesn't match the expected one ({} vs {})",
459                    decompressed.len(),
460                    uncompressed_page_size
461                ));
462            }
463
464            Bytes::from(decompressed)
465        }
466        _ => buffer,
467    };
468
469    let result = match page_header.r#type {
470        PageType::DICTIONARY_PAGE => {
471            let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
472                ParquetError::General("Missing dictionary page header".to_string())
473            })?;
474            let is_sorted = dict_header.is_sorted.unwrap_or(false);
475            Page::DictionaryPage {
476                buf: buffer,
477                num_values: dict_header.num_values.try_into()?,
478                encoding: dict_header.encoding,
479                is_sorted,
480            }
481        }
482        PageType::DATA_PAGE => {
483            let header = page_header
484                .data_page_header
485                .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
486            Page::DataPage {
487                buf: buffer,
488                num_values: header.num_values.try_into()?,
489                encoding: header.encoding,
490                def_level_encoding: header.definition_level_encoding,
491                rep_level_encoding: header.repetition_level_encoding,
492                statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
493            }
494        }
495        PageType::DATA_PAGE_V2 => {
496            let header = page_header
497                .data_page_header_v2
498                .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
499            let is_compressed = header.is_compressed.unwrap_or(true);
500            Page::DataPageV2 {
501                buf: buffer,
502                num_values: header.num_values.try_into()?,
503                encoding: header.encoding,
504                num_nulls: header.num_nulls.try_into()?,
505                num_rows: header.num_rows.try_into()?,
506                def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
507                rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
508                is_compressed,
509                statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
510            }
511        }
512        _ => {
513            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
514            return Err(general_err!(
515                "Page type {:?} is not supported",
516                page_header.r#type
517            ));
518        }
519    };
520
521    Ok(result)
522}
523
524enum SerializedPageReaderState {
525    Values {
526        /// The current byte offset in the reader
527        /// Note that offset is u64 (i.e., not usize) to support 32-bit architectures such as WASM
528        offset: u64,
529
530        /// The length of the chunk in bytes
531        /// Note that remaining_bytes is u64 (i.e., not usize) to support 32-bit architectures such as WASM
532        remaining_bytes: u64,
533
534        // If the next page header has already been "peeked", we will cache it and it`s length here
535        next_page_header: Option<Box<PageHeader>>,
536
537        /// The index of the data page within this column chunk
538        page_index: usize,
539
540        /// Whether the next page is expected to be a dictionary page
541        require_dictionary: bool,
542    },
543    Pages {
544        /// Remaining page locations
545        page_locations: VecDeque<PageLocation>,
546        /// Remaining dictionary location if any
547        dictionary_page: Option<PageLocation>,
548        /// The total number of rows in this column chunk
549        total_rows: usize,
550        /// The index of the data page within this column chunk
551        page_index: usize,
552    },
553}
554
555#[derive(Default)]
556struct SerializedPageReaderContext {
557    /// Controls decoding of page-level statistics
558    read_stats: bool,
559    /// Crypto context carrying objects required for decryption
560    #[cfg(feature = "encryption")]
561    crypto_context: Option<Arc<CryptoContext>>,
562}
563
564/// A serialized implementation for Parquet [`PageReader`].
565pub struct SerializedPageReader<R: ChunkReader> {
566    /// The chunk reader
567    reader: Arc<R>,
568
569    /// The compression codec for this column chunk. Only set for non-PLAIN codec.
570    decompressor: Option<Box<dyn Codec>>,
571
572    /// Column chunk type.
573    physical_type: Type,
574
575    state: SerializedPageReaderState,
576
577    context: SerializedPageReaderContext,
578}
579
580impl<R: ChunkReader> SerializedPageReader<R> {
581    /// Creates a new serialized page reader from a chunk reader and metadata
582    pub fn new(
583        reader: Arc<R>,
584        column_chunk_metadata: &ColumnChunkMetaData,
585        total_rows: usize,
586        page_locations: Option<Vec<PageLocation>>,
587    ) -> Result<Self> {
588        let props = Arc::new(ReaderProperties::builder().build());
589        SerializedPageReader::new_with_properties(
590            reader,
591            column_chunk_metadata,
592            total_rows,
593            page_locations,
594            props,
595        )
596    }
597
598    /// Stub No-op implementation when encryption is disabled.
599    #[cfg(all(feature = "arrow", not(feature = "encryption")))]
600    pub(crate) fn add_crypto_context(
601        self,
602        _rg_idx: usize,
603        _column_idx: usize,
604        _parquet_meta_data: &ParquetMetaData,
605        _column_chunk_metadata: &ColumnChunkMetaData,
606    ) -> Result<SerializedPageReader<R>> {
607        Ok(self)
608    }
609
610    /// Adds any necessary crypto context to this page reader, if encryption is enabled.
611    #[cfg(feature = "encryption")]
612    pub(crate) fn add_crypto_context(
613        mut self,
614        rg_idx: usize,
615        column_idx: usize,
616        parquet_meta_data: &ParquetMetaData,
617        column_chunk_metadata: &ColumnChunkMetaData,
618    ) -> Result<SerializedPageReader<R>> {
619        let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
620            return Ok(self);
621        };
622        let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
623            return Ok(self);
624        };
625        let crypto_context =
626            CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
627        self.context.crypto_context = Some(Arc::new(crypto_context));
628        Ok(self)
629    }
630
631    /// Creates a new serialized page with custom options.
632    pub fn new_with_properties(
633        reader: Arc<R>,
634        meta: &ColumnChunkMetaData,
635        total_rows: usize,
636        page_locations: Option<Vec<PageLocation>>,
637        props: ReaderPropertiesPtr,
638    ) -> Result<Self> {
639        let decompressor = create_codec(meta.compression(), props.codec_options())?;
640        let (start, len) = meta.byte_range();
641
642        let state = match page_locations {
643            Some(locations) => {
644                // If the offset of the first page doesn't match the start of the column chunk
645                // then the preceding space must contain a dictionary page.
646                let dictionary_page = match locations.first() {
647                    Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
648                        offset: start as i64,
649                        compressed_page_size: (dict_offset.offset as u64 - start) as i32,
650                        first_row_index: 0,
651                    }),
652                    _ => None,
653                };
654
655                SerializedPageReaderState::Pages {
656                    page_locations: locations.into(),
657                    dictionary_page,
658                    total_rows,
659                    page_index: 0,
660                }
661            }
662            None => SerializedPageReaderState::Values {
663                offset: start,
664                remaining_bytes: len,
665                next_page_header: None,
666                page_index: 0,
667                require_dictionary: meta.dictionary_page_offset().is_some(),
668            },
669        };
670        let mut context = SerializedPageReaderContext::default();
671        if props.read_page_stats() {
672            context.read_stats = true;
673        }
674        Ok(Self {
675            reader,
676            decompressor,
677            state,
678            physical_type: meta.column_type(),
679            context,
680        })
681    }
682
683    /// Similar to `peek_next_page`, but returns the offset of the next page instead of the page metadata.
684    /// Unlike page metadata, an offset can uniquely identify a page.
685    ///
686    /// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice.
687    /// This function allows us to check if the next page is being cached or read previously.
688    #[cfg(test)]
689    fn peek_next_page_offset(&mut self) -> Result<Option<u64>> {
690        match &mut self.state {
691            SerializedPageReaderState::Values {
692                offset,
693                remaining_bytes,
694                next_page_header,
695                page_index,
696                require_dictionary,
697            } => {
698                loop {
699                    if *remaining_bytes == 0 {
700                        return Ok(None);
701                    }
702                    return if let Some(header) = next_page_header.as_ref() {
703                        if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
704                            Ok(Some(*offset))
705                        } else {
706                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
707                            *next_page_header = None;
708                            continue;
709                        }
710                    } else {
711                        let mut read = self.reader.get_read(*offset)?;
712                        let (header_len, header) = Self::read_page_header_len(
713                            &self.context,
714                            &mut read,
715                            *page_index,
716                            *require_dictionary,
717                        )?;
718                        *offset += header_len as u64;
719                        *remaining_bytes -= header_len as u64;
720                        let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
721                            Ok(Some(*offset))
722                        } else {
723                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
724                            continue;
725                        };
726                        *next_page_header = Some(Box::new(header));
727                        page_meta
728                    };
729                }
730            }
731            SerializedPageReaderState::Pages {
732                page_locations,
733                dictionary_page,
734                ..
735            } => {
736                if let Some(page) = dictionary_page {
737                    Ok(Some(page.offset as u64))
738                } else if let Some(page) = page_locations.front() {
739                    Ok(Some(page.offset as u64))
740                } else {
741                    Ok(None)
742                }
743            }
744        }
745    }
746
747    fn read_page_header_len<T: Read>(
748        context: &SerializedPageReaderContext,
749        input: &mut T,
750        page_index: usize,
751        dictionary_page: bool,
752    ) -> Result<(usize, PageHeader)> {
753        /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
754        struct TrackedRead<R> {
755            inner: R,
756            bytes_read: usize,
757        }
758
759        impl<R: Read> Read for TrackedRead<R> {
760            fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
761                let v = self.inner.read(buf)?;
762                self.bytes_read += v;
763                Ok(v)
764            }
765        }
766
767        let mut tracked = TrackedRead {
768            inner: input,
769            bytes_read: 0,
770        };
771        let header = context.read_page_header(&mut tracked, page_index, dictionary_page)?;
772        Ok((tracked.bytes_read, header))
773    }
774
775    fn read_page_header_len_from_bytes(
776        context: &SerializedPageReaderContext,
777        buffer: &[u8],
778        page_index: usize,
779        dictionary_page: bool,
780    ) -> Result<(usize, PageHeader)> {
781        let mut input = std::io::Cursor::new(buffer);
782        let header = context.read_page_header(&mut input, page_index, dictionary_page)?;
783        let header_len = input.position() as usize;
784        Ok((header_len, header))
785    }
786}
787
788#[cfg(not(feature = "encryption"))]
789impl SerializedPageReaderContext {
790    fn read_page_header<T: Read>(
791        &self,
792        input: &mut T,
793        _page_index: usize,
794        _dictionary_page: bool,
795    ) -> Result<PageHeader> {
796        let mut prot = ThriftReadInputProtocol::new(input);
797        if self.read_stats {
798            Ok(PageHeader::read_thrift(&mut prot)?)
799        } else {
800            Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
801        }
802    }
803
804    fn decrypt_page_data<T>(
805        &self,
806        buffer: T,
807        _page_index: usize,
808        _dictionary_page: bool,
809    ) -> Result<T> {
810        Ok(buffer)
811    }
812}
813
814#[cfg(feature = "encryption")]
815impl SerializedPageReaderContext {
816    fn read_page_header<T: Read>(
817        &self,
818        input: &mut T,
819        page_index: usize,
820        dictionary_page: bool,
821    ) -> Result<PageHeader> {
822        match self.page_crypto_context(page_index, dictionary_page) {
823            None => {
824                let mut prot = ThriftReadInputProtocol::new(input);
825                if self.read_stats {
826                    Ok(PageHeader::read_thrift(&mut prot)?)
827                } else {
828                    use crate::file::metadata::thrift::PageHeader;
829
830                    Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
831                }
832            }
833            Some(page_crypto_context) => {
834                let data_decryptor = page_crypto_context.data_decryptor();
835                let aad = page_crypto_context.create_page_header_aad()?;
836
837                let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
838                    ParquetError::General(format!(
839                        "Error decrypting page header for column {}, decryption key may be wrong",
840                        page_crypto_context.column_ordinal
841                    ))
842                })?;
843
844                let mut prot = ThriftSliceInputProtocol::new(buf.as_slice());
845                if self.read_stats {
846                    Ok(PageHeader::read_thrift(&mut prot)?)
847                } else {
848                    Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
849                }
850            }
851        }
852    }
853
854    fn decrypt_page_data<T>(&self, buffer: T, page_index: usize, dictionary_page: bool) -> Result<T>
855    where
856        T: AsRef<[u8]>,
857        T: From<Vec<u8>>,
858    {
859        let page_crypto_context = self.page_crypto_context(page_index, dictionary_page);
860        if let Some(page_crypto_context) = page_crypto_context {
861            let decryptor = page_crypto_context.data_decryptor();
862            let aad = page_crypto_context.create_page_aad()?;
863            let decrypted = decryptor.decrypt(buffer.as_ref(), &aad)?;
864            Ok(T::from(decrypted))
865        } else {
866            Ok(buffer)
867        }
868    }
869
870    fn page_crypto_context(
871        &self,
872        page_index: usize,
873        dictionary_page: bool,
874    ) -> Option<Arc<CryptoContext>> {
875        self.crypto_context.as_ref().map(|c| {
876            Arc::new(if dictionary_page {
877                c.for_dictionary_page()
878            } else {
879                c.with_page_ordinal(page_index)
880            })
881        })
882    }
883}
884
885impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
886    type Item = Result<Page>;
887
888    fn next(&mut self) -> Option<Self::Item> {
889        self.get_next_page().transpose()
890    }
891}
892
893fn verify_page_header_len(header_len: usize, remaining_bytes: u64) -> Result<()> {
894    if header_len as u64 > remaining_bytes {
895        return Err(eof_err!("Invalid page header"));
896    }
897    Ok(())
898}
899
900fn verify_page_size(
901    compressed_size: i32,
902    uncompressed_size: i32,
903    remaining_bytes: u64,
904) -> Result<()> {
905    // The page's compressed size should not exceed the remaining bytes that are
906    // available to read. The page's uncompressed size is the expected size
907    // after decompression, which can never be negative.
908    if compressed_size < 0 || compressed_size as u64 > remaining_bytes || uncompressed_size < 0 {
909        return Err(eof_err!("Invalid page header"));
910    }
911    Ok(())
912}
913
914impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
915    fn get_next_page(&mut self) -> Result<Option<Page>> {
916        loop {
917            let page = match &mut self.state {
918                SerializedPageReaderState::Values {
919                    offset,
920                    remaining_bytes: remaining,
921                    next_page_header,
922                    page_index,
923                    require_dictionary,
924                } => {
925                    if *remaining == 0 {
926                        return Ok(None);
927                    }
928
929                    let mut read = self.reader.get_read(*offset)?;
930                    let header = if let Some(header) = next_page_header.take() {
931                        *header
932                    } else {
933                        let (header_len, header) = Self::read_page_header_len(
934                            &self.context,
935                            &mut read,
936                            *page_index,
937                            *require_dictionary,
938                        )?;
939                        verify_page_header_len(header_len, *remaining)?;
940                        *offset += header_len as u64;
941                        *remaining -= header_len as u64;
942                        header
943                    };
944                    verify_page_size(
945                        header.compressed_page_size,
946                        header.uncompressed_page_size,
947                        *remaining,
948                    )?;
949                    let data_len = header.compressed_page_size as usize;
950                    let data_start = *offset;
951                    *offset += data_len as u64;
952                    *remaining -= data_len as u64;
953
954                    if header.r#type == PageType::INDEX_PAGE {
955                        continue;
956                    }
957
958                    let buffer = self.reader.get_bytes(data_start, data_len)?;
959
960                    let buffer =
961                        self.context
962                            .decrypt_page_data(buffer, *page_index, *require_dictionary)?;
963
964                    let page = decode_page(
965                        header,
966                        buffer,
967                        self.physical_type,
968                        self.decompressor.as_mut(),
969                    )?;
970                    if page.is_data_page() {
971                        *page_index += 1;
972                    } else if page.is_dictionary_page() {
973                        *require_dictionary = false;
974                    }
975                    page
976                }
977                SerializedPageReaderState::Pages {
978                    page_locations,
979                    dictionary_page,
980                    page_index,
981                    ..
982                } => {
983                    let (front, is_dictionary_page) = match dictionary_page.take() {
984                        Some(front) => (front, true),
985                        None => match page_locations.pop_front() {
986                            Some(front) => (front, false),
987                            None => return Ok(None),
988                        },
989                    };
990
991                    let page_len = usize::try_from(front.compressed_page_size)?;
992                    let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
993
994                    let (offset, header) = Self::read_page_header_len_from_bytes(
995                        &self.context,
996                        buffer.as_ref(),
997                        *page_index,
998                        is_dictionary_page,
999                    )?;
1000                    let bytes = buffer.slice(offset..);
1001                    let bytes =
1002                        self.context
1003                            .decrypt_page_data(bytes, *page_index, is_dictionary_page)?;
1004
1005                    if !is_dictionary_page {
1006                        *page_index += 1;
1007                    }
1008                    decode_page(
1009                        header,
1010                        bytes,
1011                        self.physical_type,
1012                        self.decompressor.as_mut(),
1013                    )?
1014                }
1015            };
1016
1017            return Ok(Some(page));
1018        }
1019    }
1020
1021    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
1022        match &mut self.state {
1023            SerializedPageReaderState::Values {
1024                offset,
1025                remaining_bytes,
1026                next_page_header,
1027                page_index,
1028                require_dictionary,
1029            } => {
1030                loop {
1031                    if *remaining_bytes == 0 {
1032                        return Ok(None);
1033                    }
1034                    return if let Some(header) = next_page_header.as_ref() {
1035                        if let Ok(page_meta) = (&**header).try_into() {
1036                            Ok(Some(page_meta))
1037                        } else {
1038                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
1039                            *next_page_header = None;
1040                            continue;
1041                        }
1042                    } else {
1043                        let mut read = self.reader.get_read(*offset)?;
1044                        let (header_len, header) = Self::read_page_header_len(
1045                            &self.context,
1046                            &mut read,
1047                            *page_index,
1048                            *require_dictionary,
1049                        )?;
1050                        verify_page_header_len(header_len, *remaining_bytes)?;
1051                        *offset += header_len as u64;
1052                        *remaining_bytes -= header_len as u64;
1053                        let page_meta = if let Ok(page_meta) = (&header).try_into() {
1054                            Ok(Some(page_meta))
1055                        } else {
1056                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
1057                            continue;
1058                        };
1059                        *next_page_header = Some(Box::new(header));
1060                        page_meta
1061                    };
1062                }
1063            }
1064            SerializedPageReaderState::Pages {
1065                page_locations,
1066                dictionary_page,
1067                total_rows,
1068                page_index: _,
1069            } => {
1070                if dictionary_page.is_some() {
1071                    Ok(Some(PageMetadata {
1072                        num_rows: None,
1073                        num_levels: None,
1074                        is_dict: true,
1075                    }))
1076                } else if let Some(page) = page_locations.front() {
1077                    let next_rows = page_locations
1078                        .get(1)
1079                        .map(|x| x.first_row_index as usize)
1080                        .unwrap_or(*total_rows);
1081
1082                    Ok(Some(PageMetadata {
1083                        num_rows: Some(next_rows - page.first_row_index as usize),
1084                        num_levels: None,
1085                        is_dict: false,
1086                    }))
1087                } else {
1088                    Ok(None)
1089                }
1090            }
1091        }
1092    }
1093
1094    fn skip_next_page(&mut self) -> Result<()> {
1095        match &mut self.state {
1096            SerializedPageReaderState::Values {
1097                offset,
1098                remaining_bytes,
1099                next_page_header,
1100                page_index,
1101                require_dictionary,
1102            } => {
1103                if let Some(buffered_header) = next_page_header.take() {
1104                    verify_page_size(
1105                        buffered_header.compressed_page_size,
1106                        buffered_header.uncompressed_page_size,
1107                        *remaining_bytes,
1108                    )?;
1109                    // The next page header has already been peeked, so just advance the offset
1110                    *offset += buffered_header.compressed_page_size as u64;
1111                    *remaining_bytes -= buffered_header.compressed_page_size as u64;
1112                } else {
1113                    let mut read = self.reader.get_read(*offset)?;
1114                    let (header_len, header) = Self::read_page_header_len(
1115                        &self.context,
1116                        &mut read,
1117                        *page_index,
1118                        *require_dictionary,
1119                    )?;
1120                    verify_page_header_len(header_len, *remaining_bytes)?;
1121                    verify_page_size(
1122                        header.compressed_page_size,
1123                        header.uncompressed_page_size,
1124                        *remaining_bytes,
1125                    )?;
1126                    let data_page_size = header.compressed_page_size as u64;
1127                    *offset += header_len as u64 + data_page_size;
1128                    *remaining_bytes -= header_len as u64 + data_page_size;
1129                }
1130                if *require_dictionary {
1131                    *require_dictionary = false;
1132                } else {
1133                    *page_index += 1;
1134                }
1135                Ok(())
1136            }
1137            SerializedPageReaderState::Pages {
1138                page_locations,
1139                dictionary_page,
1140                page_index,
1141                ..
1142            } => {
1143                if dictionary_page.is_some() {
1144                    // If a dictionary page exists, consume it by taking it (sets to None)
1145                    dictionary_page.take();
1146                } else {
1147                    // If no dictionary page exists, simply pop the data page from page_locations
1148                    if page_locations.pop_front().is_some() {
1149                        *page_index += 1;
1150                    }
1151                }
1152
1153                Ok(())
1154            }
1155        }
1156    }
1157
1158    fn at_record_boundary(&mut self) -> Result<bool> {
1159        match &mut self.state {
1160            SerializedPageReaderState::Values { .. } => match self.peek_next_page()? {
1161                None => Ok(true),
1162                // V2 data pages must start at record boundaries per the parquet
1163                // spec, so the current page ends at one.
1164                Some(metadata) => Ok(metadata.num_rows.is_some()),
1165            },
1166            SerializedPageReaderState::Pages { .. } => Ok(true),
1167        }
1168    }
1169}
1170
1171#[cfg(test)]
1172mod tests {
1173    use std::collections::HashSet;
1174
1175    use bytes::Buf;
1176
1177    use crate::file::page_index::column_index::{
1178        ByteArrayColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex,
1179    };
1180    use crate::file::properties::{EnabledStatistics, WriterProperties};
1181
1182    use crate::basic::{self, BoundaryOrder, ColumnOrder, Encoding, SortOrder};
1183    use crate::column::reader::ColumnReader;
1184    use crate::data_type::private::ParquetValueType;
1185    use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1186    use crate::file::metadata::thrift::DataPageHeaderV2;
1187    use crate::file::writer::SerializedFileWriter;
1188    use crate::record::RowAccessor;
1189    use crate::schema::parser::parse_message_type;
1190    use crate::util::test_common::file_util::{get_test_file, get_test_path};
1191
1192    use super::*;
1193
1194    #[test]
1195    fn test_decode_page_invalid_offset() {
1196        let page_header = PageHeader {
1197            r#type: PageType::DATA_PAGE_V2,
1198            uncompressed_page_size: 10,
1199            compressed_page_size: 10,
1200            data_page_header: None,
1201            index_page_header: None,
1202            dictionary_page_header: None,
1203            crc: None,
1204            data_page_header_v2: Some(DataPageHeaderV2 {
1205                num_nulls: 0,
1206                num_rows: 0,
1207                num_values: 0,
1208                encoding: Encoding::PLAIN,
1209                definition_levels_byte_length: 11,
1210                repetition_levels_byte_length: 0,
1211                is_compressed: None,
1212                statistics: None,
1213            }),
1214        };
1215
1216        let buffer = Bytes::new();
1217        let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1218        assert!(
1219            err.to_string()
1220                .contains("DataPage v2 header contains implausible values")
1221        );
1222    }
1223
1224    #[test]
1225    fn test_decode_unsupported_page() {
1226        let mut page_header = PageHeader {
1227            r#type: PageType::INDEX_PAGE,
1228            uncompressed_page_size: 10,
1229            compressed_page_size: 10,
1230            data_page_header: None,
1231            index_page_header: None,
1232            dictionary_page_header: None,
1233            crc: None,
1234            data_page_header_v2: None,
1235        };
1236        let buffer = Bytes::new();
1237        let err = decode_page(page_header.clone(), buffer.clone(), Type::INT32, None).unwrap_err();
1238        assert_eq!(
1239            err.to_string(),
1240            "Parquet error: Page type INDEX_PAGE is not supported"
1241        );
1242
1243        page_header.data_page_header_v2 = Some(DataPageHeaderV2 {
1244            num_nulls: 0,
1245            num_rows: 0,
1246            num_values: 0,
1247            encoding: Encoding::PLAIN,
1248            definition_levels_byte_length: 11,
1249            repetition_levels_byte_length: 0,
1250            is_compressed: None,
1251            statistics: None,
1252        });
1253        let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1254        assert!(
1255            err.to_string()
1256                .contains("DataPage v2 header contains implausible values")
1257        );
1258    }
1259
1260    #[test]
1261    fn test_cursor_and_file_has_the_same_behaviour() {
1262        let mut buf: Vec<u8> = Vec::new();
1263        get_test_file("alltypes_plain.parquet")
1264            .read_to_end(&mut buf)
1265            .unwrap();
1266        let cursor = Bytes::from(buf);
1267        let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1268
1269        let test_file = get_test_file("alltypes_plain.parquet");
1270        let read_from_file = SerializedFileReader::new(test_file).unwrap();
1271
1272        let file_iter = read_from_file.get_row_iter(None).unwrap();
1273        let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1274
1275        for (a, b) in file_iter.zip(cursor_iter) {
1276            assert_eq!(a.unwrap(), b.unwrap())
1277        }
1278    }
1279
1280    #[test]
1281    fn test_file_reader_try_from() {
1282        // Valid file path
1283        let test_file = get_test_file("alltypes_plain.parquet");
1284        let test_path_buf = get_test_path("alltypes_plain.parquet");
1285        let test_path = test_path_buf.as_path();
1286        let test_path_str = test_path.to_str().unwrap();
1287
1288        let reader = SerializedFileReader::try_from(test_file);
1289        assert!(reader.is_ok());
1290
1291        let reader = SerializedFileReader::try_from(test_path);
1292        assert!(reader.is_ok());
1293
1294        let reader = SerializedFileReader::try_from(test_path_str);
1295        assert!(reader.is_ok());
1296
1297        let reader = SerializedFileReader::try_from(test_path_str.to_string());
1298        assert!(reader.is_ok());
1299
1300        // Invalid file path
1301        let test_path = Path::new("invalid.parquet");
1302        let test_path_str = test_path.to_str().unwrap();
1303
1304        let reader = SerializedFileReader::try_from(test_path);
1305        assert!(reader.is_err());
1306
1307        let reader = SerializedFileReader::try_from(test_path_str);
1308        assert!(reader.is_err());
1309
1310        let reader = SerializedFileReader::try_from(test_path_str.to_string());
1311        assert!(reader.is_err());
1312    }
1313
1314    #[test]
1315    fn test_file_reader_into_iter() {
1316        let path = get_test_path("alltypes_plain.parquet");
1317        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1318        let iter = reader.into_iter();
1319        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1320
1321        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1322    }
1323
1324    #[test]
1325    fn test_file_reader_into_iter_project() {
1326        let path = get_test_path("alltypes_plain.parquet");
1327        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1328        let schema = "message schema { OPTIONAL INT32 id; }";
1329        let proj = parse_message_type(schema).ok();
1330        let iter = reader.into_iter().project(proj).unwrap();
1331        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1332
1333        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1334    }
1335
1336    #[test]
1337    fn test_reuse_file_chunk() {
1338        // This test covers the case of maintaining the correct start position in a file
1339        // stream for each column reader after initializing and moving to the next one
1340        // (without necessarily reading the entire column).
1341        let test_file = get_test_file("alltypes_plain.parquet");
1342        let reader = SerializedFileReader::new(test_file).unwrap();
1343        let row_group = reader.get_row_group(0).unwrap();
1344
1345        let mut page_readers = Vec::new();
1346        for i in 0..row_group.num_columns() {
1347            page_readers.push(row_group.get_column_page_reader(i).unwrap());
1348        }
1349
1350        // Now buffer each col reader, we do not expect any failures like:
1351        // General("underlying Thrift error: end of file")
1352        for mut page_reader in page_readers {
1353            assert!(page_reader.get_next_page().is_ok());
1354        }
1355    }
1356
1357    #[test]
1358    fn test_file_reader() {
1359        let test_file = get_test_file("alltypes_plain.parquet");
1360        let reader_result = SerializedFileReader::new(test_file);
1361        assert!(reader_result.is_ok());
1362        let reader = reader_result.unwrap();
1363
1364        // Test contents in Parquet metadata
1365        let metadata = reader.metadata();
1366        assert_eq!(metadata.num_row_groups(), 1);
1367
1368        // Test contents in file metadata
1369        let file_metadata = metadata.file_metadata();
1370        assert!(file_metadata.created_by().is_some());
1371        assert_eq!(
1372            file_metadata.created_by().unwrap(),
1373            "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1374        );
1375        assert!(file_metadata.key_value_metadata().is_none());
1376        assert_eq!(file_metadata.num_rows(), 8);
1377        assert_eq!(file_metadata.version(), 1);
1378        assert_eq!(file_metadata.column_orders(), None);
1379
1380        // Test contents in row group metadata
1381        let row_group_metadata = metadata.row_group(0);
1382        assert_eq!(row_group_metadata.num_columns(), 11);
1383        assert_eq!(row_group_metadata.num_rows(), 8);
1384        assert_eq!(row_group_metadata.total_byte_size(), 671);
1385        // Check each column order
1386        for i in 0..row_group_metadata.num_columns() {
1387            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1388        }
1389
1390        // Test row group reader
1391        let row_group_reader_result = reader.get_row_group(0);
1392        assert!(row_group_reader_result.is_ok());
1393        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1394        assert_eq!(
1395            row_group_reader.num_columns(),
1396            row_group_metadata.num_columns()
1397        );
1398        assert_eq!(
1399            row_group_reader.metadata().total_byte_size(),
1400            row_group_metadata.total_byte_size()
1401        );
1402
1403        // Test page readers
1404        // TODO: test for every column
1405        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1406        assert!(page_reader_0_result.is_ok());
1407        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1408        let mut page_count = 0;
1409        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1410            let is_expected_page = match page {
1411                Page::DictionaryPage {
1412                    buf,
1413                    num_values,
1414                    encoding,
1415                    is_sorted,
1416                } => {
1417                    assert_eq!(buf.len(), 32);
1418                    assert_eq!(num_values, 8);
1419                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1420                    assert!(!is_sorted);
1421                    true
1422                }
1423                Page::DataPage {
1424                    buf,
1425                    num_values,
1426                    encoding,
1427                    def_level_encoding,
1428                    rep_level_encoding,
1429                    statistics,
1430                } => {
1431                    assert_eq!(buf.len(), 11);
1432                    assert_eq!(num_values, 8);
1433                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1434                    assert_eq!(def_level_encoding, Encoding::RLE);
1435                    #[allow(deprecated)]
1436                    let expected_rep_level_encoding = Encoding::BIT_PACKED;
1437                    assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1438                    assert!(statistics.is_none());
1439                    true
1440                }
1441                _ => false,
1442            };
1443            assert!(is_expected_page);
1444            page_count += 1;
1445        }
1446        assert_eq!(page_count, 2);
1447    }
1448
1449    #[test]
1450    fn test_file_reader_datapage_v2() {
1451        let test_file = get_test_file("datapage_v2.snappy.parquet");
1452        let reader_result = SerializedFileReader::new(test_file);
1453        assert!(reader_result.is_ok());
1454        let reader = reader_result.unwrap();
1455
1456        // Test contents in Parquet metadata
1457        let metadata = reader.metadata();
1458        assert_eq!(metadata.num_row_groups(), 1);
1459
1460        // Test contents in file metadata
1461        let file_metadata = metadata.file_metadata();
1462        assert!(file_metadata.created_by().is_some());
1463        assert_eq!(
1464            file_metadata.created_by().unwrap(),
1465            "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1466        );
1467        assert!(file_metadata.key_value_metadata().is_some());
1468        assert_eq!(
1469            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1470            1
1471        );
1472
1473        assert_eq!(file_metadata.num_rows(), 5);
1474        assert_eq!(file_metadata.version(), 1);
1475        assert_eq!(file_metadata.column_orders(), None);
1476
1477        let row_group_metadata = metadata.row_group(0);
1478
1479        // Check each column order
1480        for i in 0..row_group_metadata.num_columns() {
1481            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1482        }
1483
1484        // Test row group reader
1485        let row_group_reader_result = reader.get_row_group(0);
1486        assert!(row_group_reader_result.is_ok());
1487        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1488        assert_eq!(
1489            row_group_reader.num_columns(),
1490            row_group_metadata.num_columns()
1491        );
1492        assert_eq!(
1493            row_group_reader.metadata().total_byte_size(),
1494            row_group_metadata.total_byte_size()
1495        );
1496
1497        // Test page readers
1498        // TODO: test for every column
1499        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1500        assert!(page_reader_0_result.is_ok());
1501        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1502        let mut page_count = 0;
1503        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1504            let is_expected_page = match page {
1505                Page::DictionaryPage {
1506                    buf,
1507                    num_values,
1508                    encoding,
1509                    is_sorted,
1510                } => {
1511                    assert_eq!(buf.len(), 7);
1512                    assert_eq!(num_values, 1);
1513                    assert_eq!(encoding, Encoding::PLAIN);
1514                    assert!(!is_sorted);
1515                    true
1516                }
1517                Page::DataPageV2 {
1518                    buf,
1519                    num_values,
1520                    encoding,
1521                    num_nulls,
1522                    num_rows,
1523                    def_levels_byte_len,
1524                    rep_levels_byte_len,
1525                    is_compressed,
1526                    statistics,
1527                } => {
1528                    assert_eq!(buf.len(), 4);
1529                    assert_eq!(num_values, 5);
1530                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1531                    assert_eq!(num_nulls, 1);
1532                    assert_eq!(num_rows, 5);
1533                    assert_eq!(def_levels_byte_len, 2);
1534                    assert_eq!(rep_levels_byte_len, 0);
1535                    assert!(is_compressed);
1536                    assert!(statistics.is_none()); // page stats are no longer read
1537                    true
1538                }
1539                _ => false,
1540            };
1541            assert!(is_expected_page);
1542            page_count += 1;
1543        }
1544        assert_eq!(page_count, 2);
1545    }
1546
1547    #[test]
1548    fn test_file_reader_empty_compressed_datapage_v2() {
1549        // this file has a compressed datapage that un-compresses to 0 bytes
1550        let test_file = get_test_file("page_v2_empty_compressed.parquet");
1551        let reader_result = SerializedFileReader::new(test_file);
1552        assert!(reader_result.is_ok());
1553        let reader = reader_result.unwrap();
1554
1555        // Test contents in Parquet metadata
1556        let metadata = reader.metadata();
1557        assert_eq!(metadata.num_row_groups(), 1);
1558
1559        // Test contents in file metadata
1560        let file_metadata = metadata.file_metadata();
1561        assert!(file_metadata.created_by().is_some());
1562        assert_eq!(
1563            file_metadata.created_by().unwrap(),
1564            "parquet-cpp-arrow version 14.0.2"
1565        );
1566        assert!(file_metadata.key_value_metadata().is_some());
1567        assert_eq!(
1568            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1569            1
1570        );
1571
1572        assert_eq!(file_metadata.num_rows(), 10);
1573        assert_eq!(file_metadata.version(), 2);
1574        let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1575        assert_eq!(
1576            file_metadata.column_orders(),
1577            Some(vec![expected_order].as_ref())
1578        );
1579
1580        let row_group_metadata = metadata.row_group(0);
1581
1582        // Check each column order
1583        for i in 0..row_group_metadata.num_columns() {
1584            assert_eq!(file_metadata.column_order(i), expected_order);
1585        }
1586
1587        // Test row group reader
1588        let row_group_reader_result = reader.get_row_group(0);
1589        assert!(row_group_reader_result.is_ok());
1590        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1591        assert_eq!(
1592            row_group_reader.num_columns(),
1593            row_group_metadata.num_columns()
1594        );
1595        assert_eq!(
1596            row_group_reader.metadata().total_byte_size(),
1597            row_group_metadata.total_byte_size()
1598        );
1599
1600        // Test page readers
1601        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1602        assert!(page_reader_0_result.is_ok());
1603        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1604        let mut page_count = 0;
1605        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1606            let is_expected_page = match page {
1607                Page::DictionaryPage {
1608                    buf,
1609                    num_values,
1610                    encoding,
1611                    is_sorted,
1612                } => {
1613                    assert_eq!(buf.len(), 0);
1614                    assert_eq!(num_values, 0);
1615                    assert_eq!(encoding, Encoding::PLAIN);
1616                    assert!(!is_sorted);
1617                    true
1618                }
1619                Page::DataPageV2 {
1620                    buf,
1621                    num_values,
1622                    encoding,
1623                    num_nulls,
1624                    num_rows,
1625                    def_levels_byte_len,
1626                    rep_levels_byte_len,
1627                    is_compressed,
1628                    statistics,
1629                } => {
1630                    assert_eq!(buf.len(), 3);
1631                    assert_eq!(num_values, 10);
1632                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1633                    assert_eq!(num_nulls, 10);
1634                    assert_eq!(num_rows, 10);
1635                    assert_eq!(def_levels_byte_len, 2);
1636                    assert_eq!(rep_levels_byte_len, 0);
1637                    assert!(is_compressed);
1638                    assert!(statistics.is_none()); // page stats are no longer read
1639                    true
1640                }
1641                _ => false,
1642            };
1643            assert!(is_expected_page);
1644            page_count += 1;
1645        }
1646        assert_eq!(page_count, 2);
1647    }
1648
1649    #[test]
1650    fn test_file_reader_empty_datapage_v2() {
1651        // this file has 0 bytes compressed datapage that un-compresses to 0 bytes
1652        let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1653        let reader_result = SerializedFileReader::new(test_file);
1654        assert!(reader_result.is_ok());
1655        let reader = reader_result.unwrap();
1656
1657        // Test contents in Parquet metadata
1658        let metadata = reader.metadata();
1659        assert_eq!(metadata.num_row_groups(), 1);
1660
1661        // Test contents in file metadata
1662        let file_metadata = metadata.file_metadata();
1663        assert!(file_metadata.created_by().is_some());
1664        assert_eq!(
1665            file_metadata.created_by().unwrap(),
1666            "parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1667        );
1668        assert!(file_metadata.key_value_metadata().is_some());
1669        assert_eq!(
1670            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1671            2
1672        );
1673
1674        assert_eq!(file_metadata.num_rows(), 1);
1675        assert_eq!(file_metadata.version(), 1);
1676        let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1677        assert_eq!(
1678            file_metadata.column_orders(),
1679            Some(vec![expected_order].as_ref())
1680        );
1681
1682        let row_group_metadata = metadata.row_group(0);
1683
1684        // Check each column order
1685        for i in 0..row_group_metadata.num_columns() {
1686            assert_eq!(file_metadata.column_order(i), expected_order);
1687        }
1688
1689        // Test row group reader
1690        let row_group_reader_result = reader.get_row_group(0);
1691        assert!(row_group_reader_result.is_ok());
1692        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1693        assert_eq!(
1694            row_group_reader.num_columns(),
1695            row_group_metadata.num_columns()
1696        );
1697        assert_eq!(
1698            row_group_reader.metadata().total_byte_size(),
1699            row_group_metadata.total_byte_size()
1700        );
1701
1702        // Test page readers
1703        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1704        assert!(page_reader_0_result.is_ok());
1705        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1706        let mut page_count = 0;
1707        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1708            let is_expected_page = match page {
1709                Page::DataPageV2 {
1710                    buf,
1711                    num_values,
1712                    encoding,
1713                    num_nulls,
1714                    num_rows,
1715                    def_levels_byte_len,
1716                    rep_levels_byte_len,
1717                    is_compressed,
1718                    statistics,
1719                } => {
1720                    assert_eq!(buf.len(), 2);
1721                    assert_eq!(num_values, 1);
1722                    assert_eq!(encoding, Encoding::PLAIN);
1723                    assert_eq!(num_nulls, 1);
1724                    assert_eq!(num_rows, 1);
1725                    assert_eq!(def_levels_byte_len, 2);
1726                    assert_eq!(rep_levels_byte_len, 0);
1727                    assert!(is_compressed);
1728                    assert!(statistics.is_none());
1729                    true
1730                }
1731                _ => false,
1732            };
1733            assert!(is_expected_page);
1734            page_count += 1;
1735        }
1736        assert_eq!(page_count, 1);
1737    }
1738
1739    fn get_serialized_page_reader<R: ChunkReader>(
1740        file_reader: &SerializedFileReader<R>,
1741        row_group: usize,
1742        column: usize,
1743    ) -> Result<SerializedPageReader<R>> {
1744        let row_group = {
1745            let row_group_metadata = file_reader.metadata.row_group(row_group);
1746            let props = Arc::clone(&file_reader.props);
1747            let f = Arc::clone(&file_reader.chunk_reader);
1748            SerializedRowGroupReader::new(
1749                f,
1750                row_group_metadata,
1751                file_reader
1752                    .metadata
1753                    .offset_index()
1754                    .map(|x| x[row_group].as_slice()),
1755                props,
1756            )?
1757        };
1758
1759        let col = row_group.metadata.column(column);
1760
1761        let page_locations = row_group
1762            .offset_index
1763            .map(|x| x[column].page_locations.clone());
1764
1765        let props = Arc::clone(&row_group.props);
1766        SerializedPageReader::new_with_properties(
1767            Arc::clone(&row_group.chunk_reader),
1768            col,
1769            usize::try_from(row_group.metadata.num_rows())?,
1770            page_locations,
1771            props,
1772        )
1773    }
1774
1775    #[test]
1776    fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1777        let test_file = get_test_file("alltypes_plain.parquet");
1778        let reader = SerializedFileReader::new(test_file)?;
1779
1780        let mut offset_set = HashSet::new();
1781        let num_row_groups = reader.metadata.num_row_groups();
1782        for row_group in 0..num_row_groups {
1783            let num_columns = reader.metadata.row_group(row_group).num_columns();
1784            for column in 0..num_columns {
1785                let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1786
1787                while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1788                    match &page_reader.state {
1789                        SerializedPageReaderState::Pages {
1790                            page_locations,
1791                            dictionary_page,
1792                            ..
1793                        } => {
1794                            if let Some(page) = dictionary_page {
1795                                assert_eq!(page.offset as u64, page_offset);
1796                            } else if let Some(page) = page_locations.front() {
1797                                assert_eq!(page.offset as u64, page_offset);
1798                            } else {
1799                                unreachable!()
1800                            }
1801                        }
1802                        SerializedPageReaderState::Values {
1803                            offset,
1804                            next_page_header,
1805                            ..
1806                        } => {
1807                            assert!(next_page_header.is_some());
1808                            assert_eq!(*offset, page_offset);
1809                        }
1810                    }
1811                    let page = page_reader.get_next_page()?;
1812                    assert!(page.is_some());
1813                    let newly_inserted = offset_set.insert(page_offset);
1814                    assert!(newly_inserted);
1815                }
1816            }
1817        }
1818
1819        Ok(())
1820    }
1821
1822    #[test]
1823    fn test_page_iterator() {
1824        let file = get_test_file("alltypes_plain.parquet");
1825        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1826
1827        let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1828
1829        // read first page
1830        let page = page_iterator.next();
1831        assert!(page.is_some());
1832        assert!(page.unwrap().is_ok());
1833
1834        // reach end of file
1835        let page = page_iterator.next();
1836        assert!(page.is_none());
1837
1838        let row_group_indices = Box::new(0..1);
1839        let mut page_iterator =
1840            FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1841
1842        // read first page
1843        let page = page_iterator.next();
1844        assert!(page.is_some());
1845        assert!(page.unwrap().is_ok());
1846
1847        // reach end of file
1848        let page = page_iterator.next();
1849        assert!(page.is_none());
1850    }
1851
1852    #[test]
1853    fn test_file_reader_key_value_metadata() {
1854        let file = get_test_file("binary.parquet");
1855        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1856
1857        let metadata = file_reader
1858            .metadata
1859            .file_metadata()
1860            .key_value_metadata()
1861            .unwrap();
1862
1863        assert_eq!(metadata.len(), 3);
1864
1865        assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1866
1867        assert_eq!(metadata[1].key, "writer.model.name");
1868        assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1869
1870        assert_eq!(metadata[2].key, "parquet.proto.class");
1871        assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1872    }
1873
1874    #[test]
1875    fn test_file_reader_optional_metadata() {
1876        // file with optional metadata: bloom filters, encoding stats, column index and offset index.
1877        let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1878        let options = ReadOptionsBuilder::new()
1879            .with_encoding_stats_as_mask(false)
1880            .build();
1881        let file_reader = Arc::new(SerializedFileReader::new_with_options(file, options).unwrap());
1882
1883        let row_group_metadata = file_reader.metadata.row_group(0);
1884        let col0_metadata = row_group_metadata.column(0);
1885
1886        // test optional bloom filter offset
1887        assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1888
1889        // test page encoding stats
1890        let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1891
1892        assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1893        assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1894        assert_eq!(page_encoding_stats.count, 1);
1895
1896        // test optional column index offset
1897        assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1898        assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1899
1900        // test optional offset index offset
1901        assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1902        assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1903    }
1904
1905    #[test]
1906    fn test_file_reader_page_stats_mask() {
1907        let file = get_test_file("alltypes_tiny_pages.parquet");
1908        let options = ReadOptionsBuilder::new()
1909            .with_encoding_stats_as_mask(true)
1910            .build();
1911        let file_reader = Arc::new(SerializedFileReader::new_with_options(file, options).unwrap());
1912
1913        let row_group_metadata = file_reader.metadata.row_group(0);
1914
1915        // test page encoding stats
1916        let page_encoding_stats = row_group_metadata
1917            .column(0)
1918            .page_encoding_stats_mask()
1919            .unwrap();
1920        assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1921        let page_encoding_stats = row_group_metadata
1922            .column(2)
1923            .page_encoding_stats_mask()
1924            .unwrap();
1925        assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1926    }
1927
1928    #[test]
1929    fn test_file_reader_page_stats_skipped() {
1930        let file = get_test_file("alltypes_tiny_pages.parquet");
1931
1932        // test skipping all
1933        let options = ReadOptionsBuilder::new()
1934            .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1935            .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll)
1936            .build();
1937        let file_reader = Arc::new(
1938            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1939        );
1940
1941        let row_group_metadata = file_reader.metadata.row_group(0);
1942        for column in row_group_metadata.columns() {
1943            assert!(column.page_encoding_stats().is_none());
1944            assert!(column.page_encoding_stats_mask().is_none());
1945            assert!(column.statistics().is_none());
1946        }
1947
1948        // test skipping all but one column
1949        let options = ReadOptionsBuilder::new()
1950            .with_encoding_stats_as_mask(true)
1951            .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1952            .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1953            .build();
1954        let file_reader = Arc::new(
1955            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1956        );
1957
1958        let row_group_metadata = file_reader.metadata.row_group(0);
1959        for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1960            assert!(column.page_encoding_stats().is_none());
1961            assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1962            assert_eq!(column.statistics().is_some(), idx == 0);
1963        }
1964    }
1965
1966    #[test]
1967    fn test_file_reader_size_stats_skipped() {
1968        let file = get_test_file("repeated_primitive_no_list.parquet");
1969
1970        // test skipping all
1971        let options = ReadOptionsBuilder::new()
1972            .with_size_stats_policy(ParquetStatisticsPolicy::SkipAll)
1973            .build();
1974        let file_reader = Arc::new(
1975            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1976        );
1977
1978        let row_group_metadata = file_reader.metadata.row_group(0);
1979        for column in row_group_metadata.columns() {
1980            assert!(column.repetition_level_histogram().is_none());
1981            assert!(column.definition_level_histogram().is_none());
1982            assert!(column.unencoded_byte_array_data_bytes().is_none());
1983        }
1984
1985        // test skipping all but one column
1986        let options = ReadOptionsBuilder::new()
1987            .with_encoding_stats_as_mask(true)
1988            .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]))
1989            .build();
1990        let file_reader = Arc::new(
1991            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1992        );
1993
1994        let row_group_metadata = file_reader.metadata.row_group(0);
1995        for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1996            assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
1997            assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
1998            assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
1999        }
2000    }
2001
2002    #[test]
2003    fn test_file_reader_with_no_filter() -> Result<()> {
2004        let test_file = get_test_file("alltypes_plain.parquet");
2005        let origin_reader = SerializedFileReader::new(test_file)?;
2006        // test initial number of row groups
2007        let metadata = origin_reader.metadata();
2008        assert_eq!(metadata.num_row_groups(), 1);
2009        Ok(())
2010    }
2011
2012    #[test]
2013    fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
2014        let test_file = get_test_file("alltypes_plain.parquet");
2015        let read_options = ReadOptionsBuilder::new()
2016            .with_predicate(Box::new(|_, _| false))
2017            .build();
2018        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2019        let metadata = reader.metadata();
2020        assert_eq!(metadata.num_row_groups(), 0);
2021        Ok(())
2022    }
2023
2024    #[test]
2025    fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
2026        let test_file = get_test_file("alltypes_plain.parquet");
2027        let origin_reader = SerializedFileReader::new(test_file)?;
2028        // test initial number of row groups
2029        let metadata = origin_reader.metadata();
2030        assert_eq!(metadata.num_row_groups(), 1);
2031        let mid = get_midpoint_offset(metadata.row_group(0));
2032
2033        let test_file = get_test_file("alltypes_plain.parquet");
2034        let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
2035        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2036        let metadata = reader.metadata();
2037        assert_eq!(metadata.num_row_groups(), 1);
2038
2039        let test_file = get_test_file("alltypes_plain.parquet");
2040        let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
2041        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2042        let metadata = reader.metadata();
2043        assert_eq!(metadata.num_row_groups(), 0);
2044        Ok(())
2045    }
2046
2047    #[test]
2048    fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
2049        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2050        let origin_reader = SerializedFileReader::new(test_file)?;
2051        let metadata = origin_reader.metadata();
2052        let mid = get_midpoint_offset(metadata.row_group(0));
2053
2054        // true, true predicate
2055        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2056        let read_options = ReadOptionsBuilder::new()
2057            .with_page_index()
2058            .with_predicate(Box::new(|_, _| true))
2059            .with_range(mid, mid + 1)
2060            .build();
2061        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2062        let metadata = reader.metadata();
2063        assert_eq!(metadata.num_row_groups(), 1);
2064        assert_eq!(metadata.column_index().unwrap().len(), 1);
2065        assert_eq!(metadata.offset_index().unwrap().len(), 1);
2066
2067        // true, false predicate
2068        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2069        let read_options = ReadOptionsBuilder::new()
2070            .with_page_index()
2071            .with_predicate(Box::new(|_, _| true))
2072            .with_range(0, mid)
2073            .build();
2074        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2075        let metadata = reader.metadata();
2076        assert_eq!(metadata.num_row_groups(), 0);
2077        assert!(metadata.column_index().is_none());
2078        assert!(metadata.offset_index().is_none());
2079
2080        // false, true predicate
2081        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2082        let read_options = ReadOptionsBuilder::new()
2083            .with_page_index()
2084            .with_predicate(Box::new(|_, _| false))
2085            .with_range(mid, mid + 1)
2086            .build();
2087        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2088        let metadata = reader.metadata();
2089        assert_eq!(metadata.num_row_groups(), 0);
2090        assert!(metadata.column_index().is_none());
2091        assert!(metadata.offset_index().is_none());
2092
2093        // false, false predicate
2094        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2095        let read_options = ReadOptionsBuilder::new()
2096            .with_page_index()
2097            .with_predicate(Box::new(|_, _| false))
2098            .with_range(0, mid)
2099            .build();
2100        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2101        let metadata = reader.metadata();
2102        assert_eq!(metadata.num_row_groups(), 0);
2103        assert!(metadata.column_index().is_none());
2104        assert!(metadata.offset_index().is_none());
2105        Ok(())
2106    }
2107
2108    #[test]
2109    fn test_file_reader_invalid_metadata() {
2110        let data = [
2111            255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
2112            80, 65, 82, 49,
2113        ];
2114        let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
2115        assert_eq!(
2116            ret.err().unwrap().to_string(),
2117            "Parquet error: Expected list element type of Struct but got List"
2118        );
2119    }
2120
2121    #[test]
2122    // Use java parquet-tools get below pageIndex info
2123    // !```
2124    // parquet-tools column-index ./data_index_bloom_encoding_stats.parquet
2125    // row group 0:
2126    // column index for column String:
2127    // Boundary order: ASCENDING
2128    // page-0  :
2129    // null count                 min                                  max
2130    // 0                          Hello                                today
2131    //
2132    // offset index for column String:
2133    // page-0   :
2134    // offset   compressed size       first row index
2135    // 4               152                     0
2136    ///```
2137    //
2138    fn test_page_index_reader() {
2139        let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
2140        let builder = ReadOptionsBuilder::new();
2141        //enable read page index
2142        let options = builder.with_page_index().build();
2143        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2144        let reader = reader_result.unwrap();
2145
2146        // Test contents in Parquet metadata
2147        let metadata = reader.metadata();
2148        assert_eq!(metadata.num_row_groups(), 1);
2149
2150        let column_index = metadata.column_index().unwrap();
2151
2152        // only one row group
2153        assert_eq!(column_index.len(), 1);
2154        let index = if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][0] {
2155            index
2156        } else {
2157            unreachable!()
2158        };
2159
2160        assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
2161
2162        //only one page group
2163        assert_eq!(index.num_pages(), 1);
2164
2165        let min = index.min_value(0).unwrap();
2166        let max = index.max_value(0).unwrap();
2167        assert_eq!(b"Hello", min.as_bytes());
2168        assert_eq!(b"today", max.as_bytes());
2169
2170        let offset_indexes = metadata.offset_index().unwrap();
2171        // only one row group
2172        assert_eq!(offset_indexes.len(), 1);
2173        let offset_index = &offset_indexes[0];
2174        let page_offset = &offset_index[0].page_locations()[0];
2175
2176        assert_eq!(4, page_offset.offset);
2177        assert_eq!(152, page_offset.compressed_page_size);
2178        assert_eq!(0, page_offset.first_row_index);
2179    }
2180
2181    #[test]
2182    fn test_page_index_reader_all_type() {
2183        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2184        let builder = ReadOptionsBuilder::new();
2185        //enable read page index
2186        let options = builder.with_page_index().build();
2187        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2188        let reader = reader_result.unwrap();
2189
2190        // Test contents in Parquet metadata
2191        let metadata = reader.metadata();
2192        assert_eq!(metadata.num_row_groups(), 1);
2193
2194        let column_index = metadata.column_index().unwrap();
2195        let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
2196
2197        // only one row group
2198        assert_eq!(column_index.len(), 1);
2199        let row_group_metadata = metadata.row_group(0);
2200
2201        //col0->id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 7299, num_nulls: 0]
2202        assert!(!&column_index[0][0].is_sorted());
2203        let boundary_order = &column_index[0][0].get_boundary_order();
2204        assert!(boundary_order.is_some());
2205        matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
2206        if let ColumnIndexMetaData::INT32(index) = &column_index[0][0] {
2207            check_native_page_index(
2208                index,
2209                325,
2210                get_row_group_min_max_bytes(row_group_metadata, 0),
2211                BoundaryOrder::UNORDERED,
2212            );
2213            assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
2214        } else {
2215            unreachable!()
2216        };
2217        //col1->bool_col:BOOLEAN UNCOMPRESSED DO:0 FPO:37329 SZ:3022/3022/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: false, max: true, num_nulls: 0]
2218        assert!(&column_index[0][1].is_sorted());
2219        if let ColumnIndexMetaData::BOOLEAN(index) = &column_index[0][1] {
2220            assert_eq!(index.num_pages(), 82);
2221            assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
2222        } else {
2223            unreachable!()
2224        };
2225        //col2->tinyint_col: INT32 UNCOMPRESSED DO:0 FPO:40351 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2226        assert!(&column_index[0][2].is_sorted());
2227        if let ColumnIndexMetaData::INT32(index) = &column_index[0][2] {
2228            check_native_page_index(
2229                index,
2230                325,
2231                get_row_group_min_max_bytes(row_group_metadata, 2),
2232                BoundaryOrder::ASCENDING,
2233            );
2234            assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
2235        } else {
2236            unreachable!()
2237        };
2238        //col4->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2239        assert!(&column_index[0][3].is_sorted());
2240        if let ColumnIndexMetaData::INT32(index) = &column_index[0][3] {
2241            check_native_page_index(
2242                index,
2243                325,
2244                get_row_group_min_max_bytes(row_group_metadata, 3),
2245                BoundaryOrder::ASCENDING,
2246            );
2247            assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
2248        } else {
2249            unreachable!()
2250        };
2251        //col5->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2252        assert!(&column_index[0][4].is_sorted());
2253        if let ColumnIndexMetaData::INT32(index) = &column_index[0][4] {
2254            check_native_page_index(
2255                index,
2256                325,
2257                get_row_group_min_max_bytes(row_group_metadata, 4),
2258                BoundaryOrder::ASCENDING,
2259            );
2260            assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
2261        } else {
2262            unreachable!()
2263        };
2264        //col6->bigint_col: INT64 UNCOMPRESSED DO:0 FPO:152326 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 90, num_nulls: 0]
2265        assert!(!&column_index[0][5].is_sorted());
2266        if let ColumnIndexMetaData::INT64(index) = &column_index[0][5] {
2267            check_native_page_index(
2268                index,
2269                528,
2270                get_row_group_min_max_bytes(row_group_metadata, 5),
2271                BoundaryOrder::UNORDERED,
2272            );
2273            assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
2274        } else {
2275            unreachable!()
2276        };
2277        //col7->float_col: FLOAT UNCOMPRESSED DO:0 FPO:223924 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 9.9, num_nulls: 0]
2278        assert!(&column_index[0][6].is_sorted());
2279        if let ColumnIndexMetaData::FLOAT(index) = &column_index[0][6] {
2280            check_native_page_index(
2281                index,
2282                325,
2283                get_row_group_min_max_bytes(row_group_metadata, 6),
2284                BoundaryOrder::ASCENDING,
2285            );
2286            assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
2287        } else {
2288            unreachable!()
2289        };
2290        //col8->double_col: DOUBLE UNCOMPRESSED DO:0 FPO:261249 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 90.89999999999999, num_nulls: 0]
2291        assert!(!&column_index[0][7].is_sorted());
2292        if let ColumnIndexMetaData::DOUBLE(index) = &column_index[0][7] {
2293            check_native_page_index(
2294                index,
2295                528,
2296                get_row_group_min_max_bytes(row_group_metadata, 7),
2297                BoundaryOrder::UNORDERED,
2298            );
2299            assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
2300        } else {
2301            unreachable!()
2302        };
2303        //col9->date_string_col: BINARY UNCOMPRESSED DO:0 FPO:332847 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 01/01/09, max: 12/31/10, num_nulls: 0]
2304        assert!(!&column_index[0][8].is_sorted());
2305        if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][8] {
2306            check_byte_array_page_index(
2307                index,
2308                974,
2309                get_row_group_min_max_bytes(row_group_metadata, 8),
2310                BoundaryOrder::UNORDERED,
2311            );
2312            assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
2313        } else {
2314            unreachable!()
2315        };
2316        //col10->string_col: BINARY UNCOMPRESSED DO:0 FPO:444795 SZ:45298/45298/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2317        assert!(&column_index[0][9].is_sorted());
2318        if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][9] {
2319            check_byte_array_page_index(
2320                index,
2321                352,
2322                get_row_group_min_max_bytes(row_group_metadata, 9),
2323                BoundaryOrder::ASCENDING,
2324            );
2325            assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
2326        } else {
2327            unreachable!()
2328        };
2329        //col11->timestamp_col: INT96 UNCOMPRESSED DO:0 FPO:490093 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[num_nulls: 0, min/max not defined]
2330        //Notice: min_max values for each page for this col not exits.
2331        assert!(!&column_index[0][10].is_sorted());
2332        if let ColumnIndexMetaData::NONE = &column_index[0][10] {
2333            assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
2334        } else {
2335            unreachable!()
2336        };
2337        //col12->year: INT32 UNCOMPRESSED DO:0 FPO:602041 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2009, max: 2010, num_nulls: 0]
2338        assert!(&column_index[0][11].is_sorted());
2339        if let ColumnIndexMetaData::INT32(index) = &column_index[0][11] {
2340            check_native_page_index(
2341                index,
2342                325,
2343                get_row_group_min_max_bytes(row_group_metadata, 11),
2344                BoundaryOrder::ASCENDING,
2345            );
2346            assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
2347        } else {
2348            unreachable!()
2349        };
2350        //col13->month: INT32 UNCOMPRESSED DO:0 FPO:639366 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 12, num_nulls: 0]
2351        assert!(!&column_index[0][12].is_sorted());
2352        if let ColumnIndexMetaData::INT32(index) = &column_index[0][12] {
2353            check_native_page_index(
2354                index,
2355                325,
2356                get_row_group_min_max_bytes(row_group_metadata, 12),
2357                BoundaryOrder::UNORDERED,
2358            );
2359            assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
2360        } else {
2361            unreachable!()
2362        };
2363    }
2364
2365    fn check_native_page_index<T: ParquetValueType>(
2366        row_group_index: &PrimitiveColumnIndex<T>,
2367        page_size: usize,
2368        min_max: (&[u8], &[u8]),
2369        boundary_order: BoundaryOrder,
2370    ) {
2371        assert_eq!(row_group_index.num_pages() as usize, page_size);
2372        assert_eq!(row_group_index.boundary_order, boundary_order);
2373        assert!(row_group_index.min_values().iter().all(|x| {
2374            x >= &T::try_from_le_slice(min_max.0).unwrap()
2375                && x <= &T::try_from_le_slice(min_max.1).unwrap()
2376        }));
2377    }
2378
2379    fn check_byte_array_page_index(
2380        row_group_index: &ByteArrayColumnIndex,
2381        page_size: usize,
2382        min_max: (&[u8], &[u8]),
2383        boundary_order: BoundaryOrder,
2384    ) {
2385        assert_eq!(row_group_index.num_pages() as usize, page_size);
2386        assert_eq!(row_group_index.boundary_order, boundary_order);
2387        for i in 0..row_group_index.num_pages() as usize {
2388            let x = row_group_index.min_value(i).unwrap();
2389            assert!(x >= min_max.0 && x <= min_max.1);
2390        }
2391    }
2392
2393    fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
2394        let statistics = r.column(col_num).statistics().unwrap();
2395        (
2396            statistics.min_bytes_opt().unwrap_or_default(),
2397            statistics.max_bytes_opt().unwrap_or_default(),
2398        )
2399    }
2400
2401    #[test]
2402    fn test_skip_next_page_with_dictionary_page() {
2403        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2404        let builder = ReadOptionsBuilder::new();
2405        // enable read page index
2406        let options = builder.with_page_index().build();
2407        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2408        let reader = reader_result.unwrap();
2409
2410        let row_group_reader = reader.get_row_group(0).unwrap();
2411
2412        // use 'string_col', Boundary order: UNORDERED, total 352 data pages and 1 dictionary page.
2413        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2414
2415        let mut vec = vec![];
2416
2417        // Step 1: Peek and ensure dictionary page is correctly identified
2418        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2419        assert!(meta.is_dict);
2420
2421        // Step 2: Call skip_next_page to skip the dictionary page
2422        column_page_reader.skip_next_page().unwrap();
2423
2424        // Step 3: Read the next data page after skipping the dictionary page
2425        let page = column_page_reader.get_next_page().unwrap().unwrap();
2426        assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2427
2428        // Step 4: Continue reading remaining data pages and verify correctness
2429        for _i in 0..351 {
2430            // 352 total pages, 1 dictionary page is skipped
2431            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2432            assert!(!meta.is_dict); // Verify no dictionary page here
2433            vec.push(meta);
2434
2435            let page = column_page_reader.get_next_page().unwrap().unwrap();
2436            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2437        }
2438
2439        // Step 5: Check if all pages are read
2440        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2441        assert!(column_page_reader.get_next_page().unwrap().is_none());
2442
2443        // Step 6: Verify the number of data pages read (should be 351 data pages)
2444        assert_eq!(vec.len(), 351);
2445    }
2446
2447    #[test]
2448    fn test_skip_page_with_offset_index() {
2449        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2450        let builder = ReadOptionsBuilder::new();
2451        //enable read page index
2452        let options = builder.with_page_index().build();
2453        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2454        let reader = reader_result.unwrap();
2455
2456        let row_group_reader = reader.get_row_group(0).unwrap();
2457
2458        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
2459        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2460
2461        let mut vec = vec![];
2462
2463        for i in 0..325 {
2464            if i % 2 == 0 {
2465                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2466            } else {
2467                column_page_reader.skip_next_page().unwrap();
2468            }
2469        }
2470        //check read all pages.
2471        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2472        assert!(column_page_reader.get_next_page().unwrap().is_none());
2473
2474        assert_eq!(vec.len(), 163);
2475    }
2476
2477    #[test]
2478    fn test_skip_page_without_offset_index() {
2479        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2480
2481        // use default SerializedFileReader without read offsetIndex
2482        let reader_result = SerializedFileReader::new(test_file);
2483        let reader = reader_result.unwrap();
2484
2485        let row_group_reader = reader.get_row_group(0).unwrap();
2486
2487        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
2488        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2489
2490        let mut vec = vec![];
2491
2492        for i in 0..325 {
2493            if i % 2 == 0 {
2494                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2495            } else {
2496                column_page_reader.peek_next_page().unwrap().unwrap();
2497                column_page_reader.skip_next_page().unwrap();
2498            }
2499        }
2500        //check read all pages.
2501        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2502        assert!(column_page_reader.get_next_page().unwrap().is_none());
2503
2504        assert_eq!(vec.len(), 163);
2505    }
2506
2507    #[test]
2508    fn test_peek_page_with_dictionary_page() {
2509        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2510        let builder = ReadOptionsBuilder::new();
2511        //enable read page index
2512        let options = builder.with_page_index().build();
2513        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2514        let reader = reader_result.unwrap();
2515        let row_group_reader = reader.get_row_group(0).unwrap();
2516
2517        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
2518        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2519
2520        let mut vec = vec![];
2521
2522        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2523        assert!(meta.is_dict);
2524        let page = column_page_reader.get_next_page().unwrap().unwrap();
2525        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2526
2527        for i in 0..352 {
2528            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2529            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
2530            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
2531            if i != 351 {
2532                assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
2533            } else {
2534                // last page first row index is 7290, total row count is 7300
2535                // because first row start with zero, last page row count should be 10.
2536                assert_eq!(meta.num_rows, Some(10));
2537            }
2538            assert!(!meta.is_dict);
2539            vec.push(meta);
2540            let page = column_page_reader.get_next_page().unwrap().unwrap();
2541            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2542        }
2543
2544        //check read all pages.
2545        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2546        assert!(column_page_reader.get_next_page().unwrap().is_none());
2547
2548        assert_eq!(vec.len(), 352);
2549    }
2550
2551    #[test]
2552    fn test_peek_page_with_dictionary_page_without_offset_index() {
2553        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2554
2555        let reader_result = SerializedFileReader::new(test_file);
2556        let reader = reader_result.unwrap();
2557        let row_group_reader = reader.get_row_group(0).unwrap();
2558
2559        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
2560        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2561
2562        let mut vec = vec![];
2563
2564        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2565        assert!(meta.is_dict);
2566        let page = column_page_reader.get_next_page().unwrap().unwrap();
2567        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2568
2569        for i in 0..352 {
2570            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2571            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
2572            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
2573            if i != 351 {
2574                assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2575            } else {
2576                // last page first row index is 7290, total row count is 7300
2577                // because first row start with zero, last page row count should be 10.
2578                assert_eq!(meta.num_levels, Some(10));
2579            }
2580            assert!(!meta.is_dict);
2581            vec.push(meta);
2582            let page = column_page_reader.get_next_page().unwrap().unwrap();
2583            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2584        }
2585
2586        //check read all pages.
2587        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2588        assert!(column_page_reader.get_next_page().unwrap().is_none());
2589
2590        assert_eq!(vec.len(), 352);
2591    }
2592
2593    #[test]
2594    fn test_fixed_length_index() {
2595        let message_type = "
2596        message test_schema {
2597          OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2598        }
2599        ";
2600
2601        let schema = parse_message_type(message_type).unwrap();
2602        let mut out = Vec::with_capacity(1024);
2603        let mut writer =
2604            SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2605
2606        let mut r = writer.next_row_group().unwrap();
2607        let mut c = r.next_column().unwrap().unwrap();
2608        c.typed::<FixedLenByteArrayType>()
2609            .write_batch(
2610                &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2611                Some(&[1, 1, 0, 1]),
2612                None,
2613            )
2614            .unwrap();
2615        c.close().unwrap();
2616        r.close().unwrap();
2617        writer.close().unwrap();
2618
2619        let b = Bytes::from(out);
2620        let options = ReadOptionsBuilder::new().with_page_index().build();
2621        let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2622        let index = reader.metadata().column_index().unwrap();
2623
2624        // 1 row group
2625        assert_eq!(index.len(), 1);
2626        let c = &index[0];
2627        // 1 column
2628        assert_eq!(c.len(), 1);
2629
2630        match &c[0] {
2631            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(v) => {
2632                assert_eq!(v.num_pages(), 1);
2633                assert_eq!(v.null_count(0).unwrap(), 1);
2634                assert_eq!(v.min_value(0).unwrap(), &[0; 11]);
2635                assert_eq!(v.max_value(0).unwrap(), &[5; 11]);
2636            }
2637            _ => unreachable!(),
2638        }
2639    }
2640
2641    #[test]
2642    fn test_multi_gz() {
2643        let file = get_test_file("concatenated_gzip_members.parquet");
2644        let reader = SerializedFileReader::new(file).unwrap();
2645        let row_group_reader = reader.get_row_group(0).unwrap();
2646        match row_group_reader.get_column_reader(0).unwrap() {
2647            ColumnReader::Int64ColumnReader(mut reader) => {
2648                let mut buffer = Vec::with_capacity(1024);
2649                let mut def_levels = Vec::with_capacity(1024);
2650                let (num_records, num_values, num_levels) = reader
2651                    .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2652                    .unwrap();
2653
2654                assert_eq!(num_records, 513);
2655                assert_eq!(num_values, 513);
2656                assert_eq!(num_levels, 513);
2657
2658                let expected: Vec<i64> = (1..514).collect();
2659                assert_eq!(&buffer, &expected);
2660            }
2661            _ => unreachable!(),
2662        }
2663    }
2664
2665    #[test]
2666    fn test_byte_stream_split_extended() {
2667        let path = format!(
2668            "{}/byte_stream_split_extended.gzip.parquet",
2669            arrow::util::test_util::parquet_test_data(),
2670        );
2671        let file = File::open(path).unwrap();
2672        let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2673
2674        // Use full schema as projected schema
2675        let mut iter = reader
2676            .get_row_iter(None)
2677            .expect("Failed to create row iterator");
2678
2679        let mut start = 0;
2680        let end = reader.metadata().file_metadata().num_rows();
2681
2682        let check_row = |row: Result<Row, ParquetError>| {
2683            assert!(row.is_ok());
2684            let r = row.unwrap();
2685            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2686            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2687            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2688            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2689            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2690            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2691            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2692        };
2693
2694        while start < end {
2695            match iter.next() {
2696                Some(row) => check_row(row),
2697                None => break,
2698            };
2699            start += 1;
2700        }
2701    }
2702
2703    #[test]
2704    fn test_filtered_rowgroup_metadata() {
2705        let message_type = "
2706            message test_schema {
2707                REQUIRED INT32 a;
2708            }
2709        ";
2710        let schema = Arc::new(parse_message_type(message_type).unwrap());
2711        let props = Arc::new(
2712            WriterProperties::builder()
2713                .set_statistics_enabled(EnabledStatistics::Page)
2714                .build(),
2715        );
2716        let mut file: File = tempfile::tempfile().unwrap();
2717        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2718        let data = [1, 2, 3, 4, 5];
2719
2720        // write 5 row groups
2721        for idx in 0..5 {
2722            let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2723            let mut row_group_writer = file_writer.next_row_group().unwrap();
2724            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2725                writer
2726                    .typed::<Int32Type>()
2727                    .write_batch(data_i.as_slice(), None, None)
2728                    .unwrap();
2729                writer.close().unwrap();
2730            }
2731            row_group_writer.close().unwrap();
2732            file_writer.flushed_row_groups();
2733        }
2734        let file_metadata = file_writer.close().unwrap();
2735
2736        assert_eq!(file_metadata.file_metadata().num_rows(), 25);
2737        assert_eq!(file_metadata.num_row_groups(), 5);
2738
2739        // read only the 3rd row group
2740        let read_options = ReadOptionsBuilder::new()
2741            .with_page_index()
2742            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2743            .build();
2744        let reader =
2745            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2746                .unwrap();
2747        let metadata = reader.metadata();
2748
2749        // check we got the expected row group
2750        assert_eq!(metadata.num_row_groups(), 1);
2751        assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2752
2753        // check we only got the relevant page indexes
2754        assert!(metadata.column_index().is_some());
2755        assert!(metadata.offset_index().is_some());
2756        assert_eq!(metadata.column_index().unwrap().len(), 1);
2757        assert_eq!(metadata.offset_index().unwrap().len(), 1);
2758        let col_idx = metadata.column_index().unwrap();
2759        let off_idx = metadata.offset_index().unwrap();
2760        let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2761        let pg_idx = &col_idx[0][0];
2762        let off_idx_i = &off_idx[0][0];
2763
2764        // test that we got the index matching the row group
2765        match pg_idx {
2766            ColumnIndexMetaData::INT32(int_idx) => {
2767                let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2768                let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2769                assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2770                assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2771            }
2772            _ => panic!("wrong stats type"),
2773        }
2774
2775        // check offset index matches too
2776        assert_eq!(
2777            off_idx_i.page_locations[0].offset,
2778            metadata.row_group(0).column(0).data_page_offset()
2779        );
2780
2781        // read non-contiguous row groups
2782        let read_options = ReadOptionsBuilder::new()
2783            .with_page_index()
2784            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2785            .build();
2786        let reader =
2787            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2788                .unwrap();
2789        let metadata = reader.metadata();
2790
2791        // check we got the expected row groups
2792        assert_eq!(metadata.num_row_groups(), 2);
2793        assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2794        assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2795
2796        // check we only got the relevant page indexes
2797        assert!(metadata.column_index().is_some());
2798        assert!(metadata.offset_index().is_some());
2799        assert_eq!(metadata.column_index().unwrap().len(), 2);
2800        assert_eq!(metadata.offset_index().unwrap().len(), 2);
2801        let col_idx = metadata.column_index().unwrap();
2802        let off_idx = metadata.offset_index().unwrap();
2803
2804        for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2805            let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2806            let pg_idx = &col_idx_i[0];
2807            let off_idx_i = &off_idx[i][0];
2808
2809            // test that we got the index matching the row group
2810            match pg_idx {
2811                ColumnIndexMetaData::INT32(int_idx) => {
2812                    let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2813                    let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2814                    assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2815                    assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2816                }
2817                _ => panic!("wrong stats type"),
2818            }
2819
2820            // check offset index matches too
2821            assert_eq!(
2822                off_idx_i.page_locations[0].offset,
2823                metadata.row_group(i).column(0).data_page_offset()
2824            );
2825        }
2826    }
2827
2828    #[test]
2829    fn test_reuse_schema() {
2830        let file = get_test_file("alltypes_plain.parquet");
2831        let file_reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
2832        let schema = file_reader.metadata().file_metadata().schema_descr_ptr();
2833        let expected = file_reader.metadata;
2834
2835        let options = ReadOptionsBuilder::new()
2836            .with_parquet_schema(schema)
2837            .build();
2838        let file_reader = SerializedFileReader::new_with_options(file, options).unwrap();
2839
2840        assert_eq!(expected.as_ref(), file_reader.metadata.as_ref());
2841        // Should have used the same schema instance
2842        assert!(Arc::ptr_eq(
2843            &expected.file_metadata().schema_descr_ptr(),
2844            &file_reader.metadata.file_metadata().schema_descr_ptr()
2845        ));
2846    }
2847
2848    #[test]
2849    fn test_read_unknown_logical_type() {
2850        let file = get_test_file("unknown-logical-type.parquet");
2851        let reader = SerializedFileReader::new(file).expect("Error opening file");
2852
2853        let schema = reader.metadata().file_metadata().schema_descr();
2854        assert_eq!(
2855            schema.column(0).logical_type_ref(),
2856            Some(&basic::LogicalType::String)
2857        );
2858        assert_eq!(
2859            schema.column(1).logical_type_ref(),
2860            Some(&basic::LogicalType::_Unknown { field_id: 2555 })
2861        );
2862        assert_eq!(schema.column(1).physical_type(), Type::BYTE_ARRAY);
2863
2864        let mut iter = reader
2865            .get_row_iter(None)
2866            .expect("Failed to create row iterator");
2867
2868        let mut num_rows = 0;
2869        while iter.next().is_some() {
2870            num_rows += 1;
2871        }
2872        assert_eq!(num_rows, reader.metadata().file_metadata().num_rows());
2873    }
2874}