parquet/file/
serialized_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader
19//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM)
20
21use crate::basic::{Encoding, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{create_codec, Codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{read_and_decrypt, CryptoContext};
27use crate::errors::{ParquetError, Result};
28use crate::file::page_index::offset_index::OffsetIndexMetaData;
29use crate::file::{
30    metadata::*,
31    properties::{ReaderProperties, ReaderPropertiesPtr},
32    reader::*,
33    statistics,
34};
35use crate::format::{PageHeader, PageLocation, PageType};
36use crate::record::reader::RowIter;
37use crate::record::Row;
38use crate::schema::types::Type as SchemaType;
39#[cfg(feature = "encryption")]
40use crate::thrift::TCompactSliceInputProtocol;
41use crate::thrift::TSerializable;
42use bytes::Bytes;
43use std::collections::VecDeque;
44use std::{fs::File, io::Read, path::Path, sync::Arc};
45use thrift::protocol::TCompactInputProtocol;
46
47impl TryFrom<File> for SerializedFileReader<File> {
48    type Error = ParquetError;
49
50    fn try_from(file: File) -> Result<Self> {
51        Self::new(file)
52    }
53}
54
55impl TryFrom<&Path> for SerializedFileReader<File> {
56    type Error = ParquetError;
57
58    fn try_from(path: &Path) -> Result<Self> {
59        let file = File::open(path)?;
60        Self::try_from(file)
61    }
62}
63
64impl TryFrom<String> for SerializedFileReader<File> {
65    type Error = ParquetError;
66
67    fn try_from(path: String) -> Result<Self> {
68        Self::try_from(Path::new(&path))
69    }
70}
71
72impl TryFrom<&str> for SerializedFileReader<File> {
73    type Error = ParquetError;
74
75    fn try_from(path: &str) -> Result<Self> {
76        Self::try_from(Path::new(&path))
77    }
78}
79
80/// Conversion into a [`RowIter`]
81/// using the full file schema over all row groups.
82impl IntoIterator for SerializedFileReader<File> {
83    type Item = Result<Row>;
84    type IntoIter = RowIter<'static>;
85
86    fn into_iter(self) -> Self::IntoIter {
87        RowIter::from_file_into(Box::new(self))
88    }
89}
90
91// ----------------------------------------------------------------------
92// Implementations of file & row group readers
93
94/// A serialized implementation for Parquet [`FileReader`].
95pub struct SerializedFileReader<R: ChunkReader> {
96    chunk_reader: Arc<R>,
97    metadata: Arc<ParquetMetaData>,
98    props: ReaderPropertiesPtr,
99}
100
101/// A predicate for filtering row groups, invoked with the metadata and index
102/// of each row group in the file. Only row groups for which the predicate
103/// evaluates to `true` will be scanned
104pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
105
106/// A builder for [`ReadOptions`].
107/// For the predicates that are added to the builder,
108/// they will be chained using 'AND' to filter the row groups.
109#[derive(Default)]
110pub struct ReadOptionsBuilder {
111    predicates: Vec<ReadGroupPredicate>,
112    enable_page_index: bool,
113    props: Option<ReaderProperties>,
114}
115
116impl ReadOptionsBuilder {
117    /// New builder
118    pub fn new() -> Self {
119        Self::default()
120    }
121
122    /// Add a predicate on row group metadata to the reading option,
123    /// Filter only row groups that match the predicate criteria
124    pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
125        self.predicates.push(predicate);
126        self
127    }
128
129    /// Add a range predicate on filtering row groups if their midpoints are within
130    /// the Closed-Open range `[start..end) {x | start <= x < end}`
131    pub fn with_range(mut self, start: i64, end: i64) -> Self {
132        assert!(start < end);
133        let predicate = move |rg: &RowGroupMetaData, _: usize| {
134            let mid = get_midpoint_offset(rg);
135            mid >= start && mid < end
136        };
137        self.predicates.push(Box::new(predicate));
138        self
139    }
140
141    /// Enable reading the page index structures described in
142    /// "[Column Index] Layout to Support Page Skipping"
143    ///
144    /// [Column Index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
145    pub fn with_page_index(mut self) -> Self {
146        self.enable_page_index = true;
147        self
148    }
149
150    /// Set the [`ReaderProperties`] configuration.
151    pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
152        self.props = Some(properties);
153        self
154    }
155
156    /// Seal the builder and return the read options
157    pub fn build(self) -> ReadOptions {
158        let props = self
159            .props
160            .unwrap_or_else(|| ReaderProperties::builder().build());
161        ReadOptions {
162            predicates: self.predicates,
163            enable_page_index: self.enable_page_index,
164            props,
165        }
166    }
167}
168
169/// A collection of options for reading a Parquet file.
170///
171/// Currently, only predicates on row group metadata are supported.
172/// All predicates will be chained using 'AND' to filter the row groups.
173pub struct ReadOptions {
174    predicates: Vec<ReadGroupPredicate>,
175    enable_page_index: bool,
176    props: ReaderProperties,
177}
178
179impl<R: 'static + ChunkReader> SerializedFileReader<R> {
180    /// Creates file reader from a Parquet file.
181    /// Returns an error if the Parquet file does not exist or is corrupt.
182    pub fn new(chunk_reader: R) -> Result<Self> {
183        let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
184        let props = Arc::new(ReaderProperties::builder().build());
185        Ok(Self {
186            chunk_reader: Arc::new(chunk_reader),
187            metadata: Arc::new(metadata),
188            props,
189        })
190    }
191
192    /// Creates file reader from a Parquet file with read options.
193    /// Returns an error if the Parquet file does not exist or is corrupt.
194    pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
195        let mut metadata_builder = ParquetMetaDataReader::new()
196            .parse_and_finish(&chunk_reader)?
197            .into_builder();
198        let mut predicates = options.predicates;
199
200        // Filter row groups based on the predicates
201        for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
202            let mut keep = true;
203            for predicate in &mut predicates {
204                if !predicate(&rg_meta, i) {
205                    keep = false;
206                    break;
207                }
208            }
209            if keep {
210                metadata_builder = metadata_builder.add_row_group(rg_meta);
211            }
212        }
213
214        let mut metadata = metadata_builder.build();
215
216        // If page indexes are desired, build them with the filtered set of row groups
217        if options.enable_page_index {
218            let mut reader =
219                ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
220            reader.read_page_indexes(&chunk_reader)?;
221            metadata = reader.finish()?;
222        }
223
224        Ok(Self {
225            chunk_reader: Arc::new(chunk_reader),
226            metadata: Arc::new(metadata),
227            props: Arc::new(options.props),
228        })
229    }
230}
231
232/// Get midpoint offset for a row group
233fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
234    let col = meta.column(0);
235    let mut offset = col.data_page_offset();
236    if let Some(dic_offset) = col.dictionary_page_offset() {
237        if offset > dic_offset {
238            offset = dic_offset
239        }
240    };
241    offset + meta.compressed_size() / 2
242}
243
244impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
245    fn metadata(&self) -> &ParquetMetaData {
246        &self.metadata
247    }
248
249    fn num_row_groups(&self) -> usize {
250        self.metadata.num_row_groups()
251    }
252
253    fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
254        let row_group_metadata = self.metadata.row_group(i);
255        // Row groups should be processed sequentially.
256        let props = Arc::clone(&self.props);
257        let f = Arc::clone(&self.chunk_reader);
258        Ok(Box::new(SerializedRowGroupReader::new(
259            f,
260            row_group_metadata,
261            self.metadata.offset_index().map(|x| x[i].as_slice()),
262            props,
263        )?))
264    }
265
266    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
267        RowIter::from_file(projection, self)
268    }
269}
270
271/// A serialized implementation for Parquet [`RowGroupReader`].
272pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
273    chunk_reader: Arc<R>,
274    metadata: &'a RowGroupMetaData,
275    offset_index: Option<&'a [OffsetIndexMetaData]>,
276    props: ReaderPropertiesPtr,
277    bloom_filters: Vec<Option<Sbbf>>,
278}
279
280impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
281    /// Creates new row group reader from a file, row group metadata and custom config.
282    pub fn new(
283        chunk_reader: Arc<R>,
284        metadata: &'a RowGroupMetaData,
285        offset_index: Option<&'a [OffsetIndexMetaData]>,
286        props: ReaderPropertiesPtr,
287    ) -> Result<Self> {
288        let bloom_filters = if props.read_bloom_filter() {
289            metadata
290                .columns()
291                .iter()
292                .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
293                .collect::<Result<Vec<_>>>()?
294        } else {
295            std::iter::repeat_n(None, metadata.columns().len()).collect()
296        };
297        Ok(Self {
298            chunk_reader,
299            metadata,
300            offset_index,
301            props,
302            bloom_filters,
303        })
304    }
305}
306
307impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
308    fn metadata(&self) -> &RowGroupMetaData {
309        self.metadata
310    }
311
312    fn num_columns(&self) -> usize {
313        self.metadata.num_columns()
314    }
315
316    // TODO: fix PARQUET-816
317    fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
318        let col = self.metadata.column(i);
319
320        let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
321
322        let props = Arc::clone(&self.props);
323        Ok(Box::new(SerializedPageReader::new_with_properties(
324            Arc::clone(&self.chunk_reader),
325            col,
326            usize::try_from(self.metadata.num_rows())?,
327            page_locations,
328            props,
329        )?))
330    }
331
332    /// get bloom filter for the `i`th column
333    fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
334        self.bloom_filters[i].as_ref()
335    }
336
337    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
338        RowIter::from_row_group(projection, self)
339    }
340}
341
342/// Decodes a [`Page`] from the provided `buffer`
343pub(crate) fn decode_page(
344    page_header: PageHeader,
345    buffer: Bytes,
346    physical_type: Type,
347    decompressor: Option<&mut Box<dyn Codec>>,
348) -> Result<Page> {
349    // Verify the 32-bit CRC checksum of the page
350    #[cfg(feature = "crc")]
351    if let Some(expected_crc) = page_header.crc {
352        let crc = crc32fast::hash(&buffer);
353        if crc != expected_crc as u32 {
354            return Err(general_err!("Page CRC checksum mismatch"));
355        }
356    }
357
358    // When processing data page v2, depending on enabled compression for the
359    // page, we should account for uncompressed data ('offset') of
360    // repetition and definition levels.
361    //
362    // We always use 0 offset for other pages other than v2, `true` flag means
363    // that compression will be applied if decompressor is defined
364    let mut offset: usize = 0;
365    let mut can_decompress = true;
366
367    if let Some(ref header_v2) = page_header.data_page_header_v2 {
368        if header_v2.definition_levels_byte_length < 0
369            || header_v2.repetition_levels_byte_length < 0
370            || header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
371                > page_header.uncompressed_page_size
372        {
373            return Err(general_err!(
374                "DataPage v2 header contains implausible values \
375                    for definition_levels_byte_length ({}) \
376                    and repetition_levels_byte_length ({}) \
377                    given DataPage header provides uncompressed_page_size ({})",
378                header_v2.definition_levels_byte_length,
379                header_v2.repetition_levels_byte_length,
380                page_header.uncompressed_page_size
381            ));
382        }
383        offset = usize::try_from(
384            header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
385        )?;
386        // When is_compressed flag is missing the page is considered compressed
387        can_decompress = header_v2.is_compressed.unwrap_or(true);
388    }
389
390    // TODO: page header could be huge because of statistics. We should set a
391    // maximum page header size and abort if that is exceeded.
392    let buffer = match decompressor {
393        Some(decompressor) if can_decompress => {
394            let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
395            let decompressed_size = uncompressed_page_size - offset;
396            let mut decompressed = Vec::with_capacity(uncompressed_page_size);
397            decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
398            if decompressed_size > 0 {
399                let compressed = &buffer.as_ref()[offset..];
400                decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
401            }
402
403            if decompressed.len() != uncompressed_page_size {
404                return Err(general_err!(
405                    "Actual decompressed size doesn't match the expected one ({} vs {})",
406                    decompressed.len(),
407                    uncompressed_page_size
408                ));
409            }
410
411            Bytes::from(decompressed)
412        }
413        _ => buffer,
414    };
415
416    let result = match page_header.type_ {
417        PageType::DICTIONARY_PAGE => {
418            let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
419                ParquetError::General("Missing dictionary page header".to_string())
420            })?;
421            let is_sorted = dict_header.is_sorted.unwrap_or(false);
422            Page::DictionaryPage {
423                buf: buffer,
424                num_values: dict_header.num_values.try_into()?,
425                encoding: Encoding::try_from(dict_header.encoding)?,
426                is_sorted,
427            }
428        }
429        PageType::DATA_PAGE => {
430            let header = page_header
431                .data_page_header
432                .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
433            Page::DataPage {
434                buf: buffer,
435                num_values: header.num_values.try_into()?,
436                encoding: Encoding::try_from(header.encoding)?,
437                def_level_encoding: Encoding::try_from(header.definition_level_encoding)?,
438                rep_level_encoding: Encoding::try_from(header.repetition_level_encoding)?,
439                statistics: statistics::from_thrift(physical_type, header.statistics)?,
440            }
441        }
442        PageType::DATA_PAGE_V2 => {
443            let header = page_header
444                .data_page_header_v2
445                .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
446            let is_compressed = header.is_compressed.unwrap_or(true);
447            Page::DataPageV2 {
448                buf: buffer,
449                num_values: header.num_values.try_into()?,
450                encoding: Encoding::try_from(header.encoding)?,
451                num_nulls: header.num_nulls.try_into()?,
452                num_rows: header.num_rows.try_into()?,
453                def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
454                rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
455                is_compressed,
456                statistics: statistics::from_thrift(physical_type, header.statistics)?,
457            }
458        }
459        _ => {
460            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
461            unimplemented!("Page type {:?} is not supported", page_header.type_)
462        }
463    };
464
465    Ok(result)
466}
467
468enum SerializedPageReaderState {
469    Values {
470        /// The current byte offset in the reader
471        /// Note that offset is u64 (i.e., not usize) to support 32-bit architectures such as WASM
472        offset: u64,
473
474        /// The length of the chunk in bytes
475        /// Note that remaining_bytes is u64 (i.e., not usize) to support 32-bit architectures such as WASM
476        remaining_bytes: u64,
477
478        // If the next page header has already been "peeked", we will cache it and it`s length here
479        next_page_header: Option<Box<PageHeader>>,
480
481        /// The index of the data page within this column chunk
482        page_index: usize,
483
484        /// Whether the next page is expected to be a dictionary page
485        require_dictionary: bool,
486    },
487    Pages {
488        /// Remaining page locations
489        page_locations: VecDeque<PageLocation>,
490        /// Remaining dictionary location if any
491        dictionary_page: Option<PageLocation>,
492        /// The total number of rows in this column chunk
493        total_rows: usize,
494        /// The index of the data page within this column chunk
495        page_index: usize,
496    },
497}
498
499#[derive(Default)]
500struct SerializedPageReaderContext {
501    /// Crypto context carrying objects required for decryption
502    #[cfg(feature = "encryption")]
503    crypto_context: Option<Arc<CryptoContext>>,
504}
505
506/// A serialized implementation for Parquet [`PageReader`].
507pub struct SerializedPageReader<R: ChunkReader> {
508    /// The chunk reader
509    reader: Arc<R>,
510
511    /// The compression codec for this column chunk. Only set for non-PLAIN codec.
512    decompressor: Option<Box<dyn Codec>>,
513
514    /// Column chunk type.
515    physical_type: Type,
516
517    state: SerializedPageReaderState,
518
519    context: SerializedPageReaderContext,
520}
521
522impl<R: ChunkReader> SerializedPageReader<R> {
523    /// Creates a new serialized page reader from a chunk reader and metadata
524    pub fn new(
525        reader: Arc<R>,
526        column_chunk_metadata: &ColumnChunkMetaData,
527        total_rows: usize,
528        page_locations: Option<Vec<PageLocation>>,
529    ) -> Result<Self> {
530        let props = Arc::new(ReaderProperties::builder().build());
531        SerializedPageReader::new_with_properties(
532            reader,
533            column_chunk_metadata,
534            total_rows,
535            page_locations,
536            props,
537        )
538    }
539
540    /// Stub No-op implementation when encryption is disabled.
541    #[cfg(all(feature = "arrow", not(feature = "encryption")))]
542    pub(crate) fn add_crypto_context(
543        self,
544        _rg_idx: usize,
545        _column_idx: usize,
546        _parquet_meta_data: &ParquetMetaData,
547        _column_chunk_metadata: &ColumnChunkMetaData,
548    ) -> Result<SerializedPageReader<R>> {
549        Ok(self)
550    }
551
552    /// Adds any necessary crypto context to this page reader, if encryption is enabled.
553    #[cfg(feature = "encryption")]
554    pub(crate) fn add_crypto_context(
555        mut self,
556        rg_idx: usize,
557        column_idx: usize,
558        parquet_meta_data: &ParquetMetaData,
559        column_chunk_metadata: &ColumnChunkMetaData,
560    ) -> Result<SerializedPageReader<R>> {
561        let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
562            return Ok(self);
563        };
564        let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
565            return Ok(self);
566        };
567        let crypto_context =
568            CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
569        self.context.crypto_context = Some(Arc::new(crypto_context));
570        Ok(self)
571    }
572
573    /// Creates a new serialized page with custom options.
574    pub fn new_with_properties(
575        reader: Arc<R>,
576        meta: &ColumnChunkMetaData,
577        total_rows: usize,
578        page_locations: Option<Vec<PageLocation>>,
579        props: ReaderPropertiesPtr,
580    ) -> Result<Self> {
581        let decompressor = create_codec(meta.compression(), props.codec_options())?;
582        let (start, len) = meta.byte_range();
583
584        let state = match page_locations {
585            Some(locations) => {
586                // If the offset of the first page doesn't match the start of the column chunk
587                // then the preceding space must contain a dictionary page.
588                let dictionary_page = match locations.first() {
589                    Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
590                        offset: start as i64,
591                        compressed_page_size: (dict_offset.offset as u64 - start) as i32,
592                        first_row_index: 0,
593                    }),
594                    _ => None,
595                };
596
597                SerializedPageReaderState::Pages {
598                    page_locations: locations.into(),
599                    dictionary_page,
600                    total_rows,
601                    page_index: 0,
602                }
603            }
604            None => SerializedPageReaderState::Values {
605                offset: start,
606                remaining_bytes: len,
607                next_page_header: None,
608                page_index: 0,
609                require_dictionary: meta.dictionary_page_offset().is_some(),
610            },
611        };
612        Ok(Self {
613            reader,
614            decompressor,
615            state,
616            physical_type: meta.column_type(),
617            context: Default::default(),
618        })
619    }
620
621    /// Similar to `peek_next_page`, but returns the offset of the next page instead of the page metadata.
622    /// Unlike page metadata, an offset can uniquely identify a page.
623    ///
624    /// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice.
625    /// This function allows us to check if the next page is being cached or read previously.
626    #[cfg(test)]
627    fn peek_next_page_offset(&mut self) -> Result<Option<u64>> {
628        match &mut self.state {
629            SerializedPageReaderState::Values {
630                offset,
631                remaining_bytes,
632                next_page_header,
633                page_index,
634                require_dictionary,
635            } => {
636                loop {
637                    if *remaining_bytes == 0 {
638                        return Ok(None);
639                    }
640                    return if let Some(header) = next_page_header.as_ref() {
641                        if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
642                            Ok(Some(*offset))
643                        } else {
644                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
645                            *next_page_header = None;
646                            continue;
647                        }
648                    } else {
649                        let mut read = self.reader.get_read(*offset)?;
650                        let (header_len, header) = Self::read_page_header_len(
651                            &self.context,
652                            &mut read,
653                            *page_index,
654                            *require_dictionary,
655                        )?;
656                        *offset += header_len as u64;
657                        *remaining_bytes -= header_len as u64;
658                        let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
659                            Ok(Some(*offset))
660                        } else {
661                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
662                            continue;
663                        };
664                        *next_page_header = Some(Box::new(header));
665                        page_meta
666                    };
667                }
668            }
669            SerializedPageReaderState::Pages {
670                page_locations,
671                dictionary_page,
672                ..
673            } => {
674                if let Some(page) = dictionary_page {
675                    Ok(Some(page.offset as u64))
676                } else if let Some(page) = page_locations.front() {
677                    Ok(Some(page.offset as u64))
678                } else {
679                    Ok(None)
680                }
681            }
682        }
683    }
684
685    fn read_page_header_len<T: Read>(
686        context: &SerializedPageReaderContext,
687        input: &mut T,
688        page_index: usize,
689        dictionary_page: bool,
690    ) -> Result<(usize, PageHeader)> {
691        /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
692        struct TrackedRead<R> {
693            inner: R,
694            bytes_read: usize,
695        }
696
697        impl<R: Read> Read for TrackedRead<R> {
698            fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
699                let v = self.inner.read(buf)?;
700                self.bytes_read += v;
701                Ok(v)
702            }
703        }
704
705        let mut tracked = TrackedRead {
706            inner: input,
707            bytes_read: 0,
708        };
709        let header = context.read_page_header(&mut tracked, page_index, dictionary_page)?;
710        Ok((tracked.bytes_read, header))
711    }
712
713    fn read_page_header_len_from_bytes(
714        context: &SerializedPageReaderContext,
715        buffer: &[u8],
716        page_index: usize,
717        dictionary_page: bool,
718    ) -> Result<(usize, PageHeader)> {
719        let mut input = std::io::Cursor::new(buffer);
720        let header = context.read_page_header(&mut input, page_index, dictionary_page)?;
721        let header_len = input.position() as usize;
722        Ok((header_len, header))
723    }
724}
725
726#[cfg(not(feature = "encryption"))]
727impl SerializedPageReaderContext {
728    fn read_page_header<T: Read>(
729        &self,
730        input: &mut T,
731        _page_index: usize,
732        _dictionary_page: bool,
733    ) -> Result<PageHeader> {
734        let mut prot = TCompactInputProtocol::new(input);
735        Ok(PageHeader::read_from_in_protocol(&mut prot)?)
736    }
737
738    fn decrypt_page_data<T>(
739        &self,
740        buffer: T,
741        _page_index: usize,
742        _dictionary_page: bool,
743    ) -> Result<T> {
744        Ok(buffer)
745    }
746}
747
748#[cfg(feature = "encryption")]
749impl SerializedPageReaderContext {
750    fn read_page_header<T: Read>(
751        &self,
752        input: &mut T,
753        page_index: usize,
754        dictionary_page: bool,
755    ) -> Result<PageHeader> {
756        match self.page_crypto_context(page_index, dictionary_page) {
757            None => {
758                let mut prot = TCompactInputProtocol::new(input);
759                Ok(PageHeader::read_from_in_protocol(&mut prot)?)
760            }
761            Some(page_crypto_context) => {
762                let data_decryptor = page_crypto_context.data_decryptor();
763                let aad = page_crypto_context.create_page_header_aad()?;
764
765                let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
766                    ParquetError::General(format!(
767                        "Error decrypting page header for column {}, decryption key may be wrong",
768                        page_crypto_context.column_ordinal
769                    ))
770                })?;
771
772                let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
773                Ok(PageHeader::read_from_in_protocol(&mut prot)?)
774            }
775        }
776    }
777
778    fn decrypt_page_data<T>(&self, buffer: T, page_index: usize, dictionary_page: bool) -> Result<T>
779    where
780        T: AsRef<[u8]>,
781        T: From<Vec<u8>>,
782    {
783        let page_crypto_context = self.page_crypto_context(page_index, dictionary_page);
784        if let Some(page_crypto_context) = page_crypto_context {
785            let decryptor = page_crypto_context.data_decryptor();
786            let aad = page_crypto_context.create_page_aad()?;
787            let decrypted = decryptor.decrypt(buffer.as_ref(), &aad)?;
788            Ok(T::from(decrypted))
789        } else {
790            Ok(buffer)
791        }
792    }
793
794    fn page_crypto_context(
795        &self,
796        page_index: usize,
797        dictionary_page: bool,
798    ) -> Option<Arc<CryptoContext>> {
799        self.crypto_context.as_ref().map(|c| {
800            Arc::new(if dictionary_page {
801                c.for_dictionary_page()
802            } else {
803                c.with_page_ordinal(page_index)
804            })
805        })
806    }
807}
808
809impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
810    type Item = Result<Page>;
811
812    fn next(&mut self) -> Option<Self::Item> {
813        self.get_next_page().transpose()
814    }
815}
816
817fn verify_page_header_len(header_len: usize, remaining_bytes: u64) -> Result<()> {
818    if header_len as u64 > remaining_bytes {
819        return Err(eof_err!("Invalid page header"));
820    }
821    Ok(())
822}
823
824fn verify_page_size(
825    compressed_size: i32,
826    uncompressed_size: i32,
827    remaining_bytes: u64,
828) -> Result<()> {
829    // The page's compressed size should not exceed the remaining bytes that are
830    // available to read. The page's uncompressed size is the expected size
831    // after decompression, which can never be negative.
832    if compressed_size < 0 || compressed_size as u64 > remaining_bytes || uncompressed_size < 0 {
833        return Err(eof_err!("Invalid page header"));
834    }
835    Ok(())
836}
837
838impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
839    fn get_next_page(&mut self) -> Result<Option<Page>> {
840        loop {
841            let page = match &mut self.state {
842                SerializedPageReaderState::Values {
843                    offset,
844                    remaining_bytes: remaining,
845                    next_page_header,
846                    page_index,
847                    require_dictionary,
848                } => {
849                    if *remaining == 0 {
850                        return Ok(None);
851                    }
852
853                    let mut read = self.reader.get_read(*offset)?;
854                    let header = if let Some(header) = next_page_header.take() {
855                        *header
856                    } else {
857                        let (header_len, header) = Self::read_page_header_len(
858                            &self.context,
859                            &mut read,
860                            *page_index,
861                            *require_dictionary,
862                        )?;
863                        verify_page_header_len(header_len, *remaining)?;
864                        *offset += header_len as u64;
865                        *remaining -= header_len as u64;
866                        header
867                    };
868                    verify_page_size(
869                        header.compressed_page_size,
870                        header.uncompressed_page_size,
871                        *remaining,
872                    )?;
873                    let data_len = header.compressed_page_size as usize;
874                    *offset += data_len as u64;
875                    *remaining -= data_len as u64;
876
877                    if header.type_ == PageType::INDEX_PAGE {
878                        continue;
879                    }
880
881                    let mut buffer = Vec::with_capacity(data_len);
882                    let read = read.take(data_len as u64).read_to_end(&mut buffer)?;
883
884                    if read != data_len {
885                        return Err(eof_err!(
886                            "Expected to read {} bytes of page, read only {}",
887                            data_len,
888                            read
889                        ));
890                    }
891
892                    let buffer =
893                        self.context
894                            .decrypt_page_data(buffer, *page_index, *require_dictionary)?;
895
896                    let page = decode_page(
897                        header,
898                        Bytes::from(buffer),
899                        self.physical_type,
900                        self.decompressor.as_mut(),
901                    )?;
902                    if page.is_data_page() {
903                        *page_index += 1;
904                    } else if page.is_dictionary_page() {
905                        *require_dictionary = false;
906                    }
907                    page
908                }
909                SerializedPageReaderState::Pages {
910                    page_locations,
911                    dictionary_page,
912                    page_index,
913                    ..
914                } => {
915                    let (front, is_dictionary_page) = match dictionary_page.take() {
916                        Some(front) => (front, true),
917                        None => match page_locations.pop_front() {
918                            Some(front) => (front, false),
919                            None => return Ok(None),
920                        },
921                    };
922
923                    let page_len = usize::try_from(front.compressed_page_size)?;
924                    let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
925
926                    let (offset, header) = Self::read_page_header_len_from_bytes(
927                        &self.context,
928                        buffer.as_ref(),
929                        *page_index,
930                        is_dictionary_page,
931                    )?;
932                    let bytes = buffer.slice(offset..);
933                    let bytes =
934                        self.context
935                            .decrypt_page_data(bytes, *page_index, is_dictionary_page)?;
936
937                    if !is_dictionary_page {
938                        *page_index += 1;
939                    }
940                    decode_page(
941                        header,
942                        bytes,
943                        self.physical_type,
944                        self.decompressor.as_mut(),
945                    )?
946                }
947            };
948
949            return Ok(Some(page));
950        }
951    }
952
953    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
954        match &mut self.state {
955            SerializedPageReaderState::Values {
956                offset,
957                remaining_bytes,
958                next_page_header,
959                page_index,
960                require_dictionary,
961            } => {
962                loop {
963                    if *remaining_bytes == 0 {
964                        return Ok(None);
965                    }
966                    return if let Some(header) = next_page_header.as_ref() {
967                        if let Ok(page_meta) = (&**header).try_into() {
968                            Ok(Some(page_meta))
969                        } else {
970                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
971                            *next_page_header = None;
972                            continue;
973                        }
974                    } else {
975                        let mut read = self.reader.get_read(*offset)?;
976                        let (header_len, header) = Self::read_page_header_len(
977                            &self.context,
978                            &mut read,
979                            *page_index,
980                            *require_dictionary,
981                        )?;
982                        verify_page_header_len(header_len, *remaining_bytes)?;
983                        *offset += header_len as u64;
984                        *remaining_bytes -= header_len as u64;
985                        let page_meta = if let Ok(page_meta) = (&header).try_into() {
986                            Ok(Some(page_meta))
987                        } else {
988                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
989                            continue;
990                        };
991                        *next_page_header = Some(Box::new(header));
992                        page_meta
993                    };
994                }
995            }
996            SerializedPageReaderState::Pages {
997                page_locations,
998                dictionary_page,
999                total_rows,
1000                page_index: _,
1001            } => {
1002                if dictionary_page.is_some() {
1003                    Ok(Some(PageMetadata {
1004                        num_rows: None,
1005                        num_levels: None,
1006                        is_dict: true,
1007                    }))
1008                } else if let Some(page) = page_locations.front() {
1009                    let next_rows = page_locations
1010                        .get(1)
1011                        .map(|x| x.first_row_index as usize)
1012                        .unwrap_or(*total_rows);
1013
1014                    Ok(Some(PageMetadata {
1015                        num_rows: Some(next_rows - page.first_row_index as usize),
1016                        num_levels: None,
1017                        is_dict: false,
1018                    }))
1019                } else {
1020                    Ok(None)
1021                }
1022            }
1023        }
1024    }
1025
1026    fn skip_next_page(&mut self) -> Result<()> {
1027        match &mut self.state {
1028            SerializedPageReaderState::Values {
1029                offset,
1030                remaining_bytes,
1031                next_page_header,
1032                page_index,
1033                require_dictionary,
1034            } => {
1035                if let Some(buffered_header) = next_page_header.take() {
1036                    verify_page_size(
1037                        buffered_header.compressed_page_size,
1038                        buffered_header.uncompressed_page_size,
1039                        *remaining_bytes,
1040                    )?;
1041                    // The next page header has already been peeked, so just advance the offset
1042                    *offset += buffered_header.compressed_page_size as u64;
1043                    *remaining_bytes -= buffered_header.compressed_page_size as u64;
1044                } else {
1045                    let mut read = self.reader.get_read(*offset)?;
1046                    let (header_len, header) = Self::read_page_header_len(
1047                        &self.context,
1048                        &mut read,
1049                        *page_index,
1050                        *require_dictionary,
1051                    )?;
1052                    verify_page_header_len(header_len, *remaining_bytes)?;
1053                    verify_page_size(
1054                        header.compressed_page_size,
1055                        header.uncompressed_page_size,
1056                        *remaining_bytes,
1057                    )?;
1058                    let data_page_size = header.compressed_page_size as u64;
1059                    *offset += header_len as u64 + data_page_size;
1060                    *remaining_bytes -= header_len as u64 + data_page_size;
1061                }
1062                if *require_dictionary {
1063                    *require_dictionary = false;
1064                } else {
1065                    *page_index += 1;
1066                }
1067                Ok(())
1068            }
1069            SerializedPageReaderState::Pages {
1070                page_locations,
1071                dictionary_page,
1072                page_index,
1073                ..
1074            } => {
1075                if dictionary_page.is_some() {
1076                    // If a dictionary page exists, consume it by taking it (sets to None)
1077                    dictionary_page.take();
1078                } else {
1079                    // If no dictionary page exists, simply pop the data page from page_locations
1080                    if page_locations.pop_front().is_some() {
1081                        *page_index += 1;
1082                    }
1083                }
1084
1085                Ok(())
1086            }
1087        }
1088    }
1089
1090    fn at_record_boundary(&mut self) -> Result<bool> {
1091        match &mut self.state {
1092            SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
1093            SerializedPageReaderState::Pages { .. } => Ok(true),
1094        }
1095    }
1096}
1097
1098#[cfg(test)]
1099mod tests {
1100    use std::collections::HashSet;
1101
1102    use bytes::Buf;
1103
1104    use crate::file::properties::{EnabledStatistics, WriterProperties};
1105    use crate::format::BoundaryOrder;
1106
1107    use crate::basic::{self, ColumnOrder, SortOrder};
1108    use crate::column::reader::ColumnReader;
1109    use crate::data_type::private::ParquetValueType;
1110    use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1111    use crate::file::page_index::index::{Index, NativeIndex};
1112    #[allow(deprecated)]
1113    use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
1114    use crate::file::writer::SerializedFileWriter;
1115    use crate::record::RowAccessor;
1116    use crate::schema::parser::parse_message_type;
1117    use crate::util::test_common::file_util::{get_test_file, get_test_path};
1118
1119    use super::*;
1120
1121    #[test]
1122    fn test_cursor_and_file_has_the_same_behaviour() {
1123        let mut buf: Vec<u8> = Vec::new();
1124        get_test_file("alltypes_plain.parquet")
1125            .read_to_end(&mut buf)
1126            .unwrap();
1127        let cursor = Bytes::from(buf);
1128        let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1129
1130        let test_file = get_test_file("alltypes_plain.parquet");
1131        let read_from_file = SerializedFileReader::new(test_file).unwrap();
1132
1133        let file_iter = read_from_file.get_row_iter(None).unwrap();
1134        let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1135
1136        for (a, b) in file_iter.zip(cursor_iter) {
1137            assert_eq!(a.unwrap(), b.unwrap())
1138        }
1139    }
1140
1141    #[test]
1142    fn test_file_reader_try_from() {
1143        // Valid file path
1144        let test_file = get_test_file("alltypes_plain.parquet");
1145        let test_path_buf = get_test_path("alltypes_plain.parquet");
1146        let test_path = test_path_buf.as_path();
1147        let test_path_str = test_path.to_str().unwrap();
1148
1149        let reader = SerializedFileReader::try_from(test_file);
1150        assert!(reader.is_ok());
1151
1152        let reader = SerializedFileReader::try_from(test_path);
1153        assert!(reader.is_ok());
1154
1155        let reader = SerializedFileReader::try_from(test_path_str);
1156        assert!(reader.is_ok());
1157
1158        let reader = SerializedFileReader::try_from(test_path_str.to_string());
1159        assert!(reader.is_ok());
1160
1161        // Invalid file path
1162        let test_path = Path::new("invalid.parquet");
1163        let test_path_str = test_path.to_str().unwrap();
1164
1165        let reader = SerializedFileReader::try_from(test_path);
1166        assert!(reader.is_err());
1167
1168        let reader = SerializedFileReader::try_from(test_path_str);
1169        assert!(reader.is_err());
1170
1171        let reader = SerializedFileReader::try_from(test_path_str.to_string());
1172        assert!(reader.is_err());
1173    }
1174
1175    #[test]
1176    fn test_file_reader_into_iter() {
1177        let path = get_test_path("alltypes_plain.parquet");
1178        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1179        let iter = reader.into_iter();
1180        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1181
1182        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1183    }
1184
1185    #[test]
1186    fn test_file_reader_into_iter_project() {
1187        let path = get_test_path("alltypes_plain.parquet");
1188        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1189        let schema = "message schema { OPTIONAL INT32 id; }";
1190        let proj = parse_message_type(schema).ok();
1191        let iter = reader.into_iter().project(proj).unwrap();
1192        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1193
1194        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1195    }
1196
1197    #[test]
1198    fn test_reuse_file_chunk() {
1199        // This test covers the case of maintaining the correct start position in a file
1200        // stream for each column reader after initializing and moving to the next one
1201        // (without necessarily reading the entire column).
1202        let test_file = get_test_file("alltypes_plain.parquet");
1203        let reader = SerializedFileReader::new(test_file).unwrap();
1204        let row_group = reader.get_row_group(0).unwrap();
1205
1206        let mut page_readers = Vec::new();
1207        for i in 0..row_group.num_columns() {
1208            page_readers.push(row_group.get_column_page_reader(i).unwrap());
1209        }
1210
1211        // Now buffer each col reader, we do not expect any failures like:
1212        // General("underlying Thrift error: end of file")
1213        for mut page_reader in page_readers {
1214            assert!(page_reader.get_next_page().is_ok());
1215        }
1216    }
1217
1218    #[test]
1219    fn test_file_reader() {
1220        let test_file = get_test_file("alltypes_plain.parquet");
1221        let reader_result = SerializedFileReader::new(test_file);
1222        assert!(reader_result.is_ok());
1223        let reader = reader_result.unwrap();
1224
1225        // Test contents in Parquet metadata
1226        let metadata = reader.metadata();
1227        assert_eq!(metadata.num_row_groups(), 1);
1228
1229        // Test contents in file metadata
1230        let file_metadata = metadata.file_metadata();
1231        assert!(file_metadata.created_by().is_some());
1232        assert_eq!(
1233            file_metadata.created_by().unwrap(),
1234            "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1235        );
1236        assert!(file_metadata.key_value_metadata().is_none());
1237        assert_eq!(file_metadata.num_rows(), 8);
1238        assert_eq!(file_metadata.version(), 1);
1239        assert_eq!(file_metadata.column_orders(), None);
1240
1241        // Test contents in row group metadata
1242        let row_group_metadata = metadata.row_group(0);
1243        assert_eq!(row_group_metadata.num_columns(), 11);
1244        assert_eq!(row_group_metadata.num_rows(), 8);
1245        assert_eq!(row_group_metadata.total_byte_size(), 671);
1246        // Check each column order
1247        for i in 0..row_group_metadata.num_columns() {
1248            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1249        }
1250
1251        // Test row group reader
1252        let row_group_reader_result = reader.get_row_group(0);
1253        assert!(row_group_reader_result.is_ok());
1254        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1255        assert_eq!(
1256            row_group_reader.num_columns(),
1257            row_group_metadata.num_columns()
1258        );
1259        assert_eq!(
1260            row_group_reader.metadata().total_byte_size(),
1261            row_group_metadata.total_byte_size()
1262        );
1263
1264        // Test page readers
1265        // TODO: test for every column
1266        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1267        assert!(page_reader_0_result.is_ok());
1268        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1269        let mut page_count = 0;
1270        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1271            let is_expected_page = match page {
1272                Page::DictionaryPage {
1273                    buf,
1274                    num_values,
1275                    encoding,
1276                    is_sorted,
1277                } => {
1278                    assert_eq!(buf.len(), 32);
1279                    assert_eq!(num_values, 8);
1280                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1281                    assert!(!is_sorted);
1282                    true
1283                }
1284                Page::DataPage {
1285                    buf,
1286                    num_values,
1287                    encoding,
1288                    def_level_encoding,
1289                    rep_level_encoding,
1290                    statistics,
1291                } => {
1292                    assert_eq!(buf.len(), 11);
1293                    assert_eq!(num_values, 8);
1294                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1295                    assert_eq!(def_level_encoding, Encoding::RLE);
1296                    #[allow(deprecated)]
1297                    let expected_rep_level_encoding = Encoding::BIT_PACKED;
1298                    assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1299                    assert!(statistics.is_none());
1300                    true
1301                }
1302                _ => false,
1303            };
1304            assert!(is_expected_page);
1305            page_count += 1;
1306        }
1307        assert_eq!(page_count, 2);
1308    }
1309
1310    #[test]
1311    fn test_file_reader_datapage_v2() {
1312        let test_file = get_test_file("datapage_v2.snappy.parquet");
1313        let reader_result = SerializedFileReader::new(test_file);
1314        assert!(reader_result.is_ok());
1315        let reader = reader_result.unwrap();
1316
1317        // Test contents in Parquet metadata
1318        let metadata = reader.metadata();
1319        assert_eq!(metadata.num_row_groups(), 1);
1320
1321        // Test contents in file metadata
1322        let file_metadata = metadata.file_metadata();
1323        assert!(file_metadata.created_by().is_some());
1324        assert_eq!(
1325            file_metadata.created_by().unwrap(),
1326            "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1327        );
1328        assert!(file_metadata.key_value_metadata().is_some());
1329        assert_eq!(
1330            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1331            1
1332        );
1333
1334        assert_eq!(file_metadata.num_rows(), 5);
1335        assert_eq!(file_metadata.version(), 1);
1336        assert_eq!(file_metadata.column_orders(), None);
1337
1338        let row_group_metadata = metadata.row_group(0);
1339
1340        // Check each column order
1341        for i in 0..row_group_metadata.num_columns() {
1342            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1343        }
1344
1345        // Test row group reader
1346        let row_group_reader_result = reader.get_row_group(0);
1347        assert!(row_group_reader_result.is_ok());
1348        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1349        assert_eq!(
1350            row_group_reader.num_columns(),
1351            row_group_metadata.num_columns()
1352        );
1353        assert_eq!(
1354            row_group_reader.metadata().total_byte_size(),
1355            row_group_metadata.total_byte_size()
1356        );
1357
1358        // Test page readers
1359        // TODO: test for every column
1360        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1361        assert!(page_reader_0_result.is_ok());
1362        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1363        let mut page_count = 0;
1364        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1365            let is_expected_page = match page {
1366                Page::DictionaryPage {
1367                    buf,
1368                    num_values,
1369                    encoding,
1370                    is_sorted,
1371                } => {
1372                    assert_eq!(buf.len(), 7);
1373                    assert_eq!(num_values, 1);
1374                    assert_eq!(encoding, Encoding::PLAIN);
1375                    assert!(!is_sorted);
1376                    true
1377                }
1378                Page::DataPageV2 {
1379                    buf,
1380                    num_values,
1381                    encoding,
1382                    num_nulls,
1383                    num_rows,
1384                    def_levels_byte_len,
1385                    rep_levels_byte_len,
1386                    is_compressed,
1387                    statistics,
1388                } => {
1389                    assert_eq!(buf.len(), 4);
1390                    assert_eq!(num_values, 5);
1391                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1392                    assert_eq!(num_nulls, 1);
1393                    assert_eq!(num_rows, 5);
1394                    assert_eq!(def_levels_byte_len, 2);
1395                    assert_eq!(rep_levels_byte_len, 0);
1396                    assert!(is_compressed);
1397                    assert!(statistics.is_some());
1398                    true
1399                }
1400                _ => false,
1401            };
1402            assert!(is_expected_page);
1403            page_count += 1;
1404        }
1405        assert_eq!(page_count, 2);
1406    }
1407
1408    #[test]
1409    fn test_file_reader_empty_compressed_datapage_v2() {
1410        // this file has a compressed datapage that un-compresses to 0 bytes
1411        let test_file = get_test_file("page_v2_empty_compressed.parquet");
1412        let reader_result = SerializedFileReader::new(test_file);
1413        assert!(reader_result.is_ok());
1414        let reader = reader_result.unwrap();
1415
1416        // Test contents in Parquet metadata
1417        let metadata = reader.metadata();
1418        assert_eq!(metadata.num_row_groups(), 1);
1419
1420        // Test contents in file metadata
1421        let file_metadata = metadata.file_metadata();
1422        assert!(file_metadata.created_by().is_some());
1423        assert_eq!(
1424            file_metadata.created_by().unwrap(),
1425            "parquet-cpp-arrow version 14.0.2"
1426        );
1427        assert!(file_metadata.key_value_metadata().is_some());
1428        assert_eq!(
1429            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1430            1
1431        );
1432
1433        assert_eq!(file_metadata.num_rows(), 10);
1434        assert_eq!(file_metadata.version(), 2);
1435        let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1436        assert_eq!(
1437            file_metadata.column_orders(),
1438            Some(vec![expected_order].as_ref())
1439        );
1440
1441        let row_group_metadata = metadata.row_group(0);
1442
1443        // Check each column order
1444        for i in 0..row_group_metadata.num_columns() {
1445            assert_eq!(file_metadata.column_order(i), expected_order);
1446        }
1447
1448        // Test row group reader
1449        let row_group_reader_result = reader.get_row_group(0);
1450        assert!(row_group_reader_result.is_ok());
1451        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1452        assert_eq!(
1453            row_group_reader.num_columns(),
1454            row_group_metadata.num_columns()
1455        );
1456        assert_eq!(
1457            row_group_reader.metadata().total_byte_size(),
1458            row_group_metadata.total_byte_size()
1459        );
1460
1461        // Test page readers
1462        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1463        assert!(page_reader_0_result.is_ok());
1464        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1465        let mut page_count = 0;
1466        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1467            let is_expected_page = match page {
1468                Page::DictionaryPage {
1469                    buf,
1470                    num_values,
1471                    encoding,
1472                    is_sorted,
1473                } => {
1474                    assert_eq!(buf.len(), 0);
1475                    assert_eq!(num_values, 0);
1476                    assert_eq!(encoding, Encoding::PLAIN);
1477                    assert!(!is_sorted);
1478                    true
1479                }
1480                Page::DataPageV2 {
1481                    buf,
1482                    num_values,
1483                    encoding,
1484                    num_nulls,
1485                    num_rows,
1486                    def_levels_byte_len,
1487                    rep_levels_byte_len,
1488                    is_compressed,
1489                    statistics,
1490                } => {
1491                    assert_eq!(buf.len(), 3);
1492                    assert_eq!(num_values, 10);
1493                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1494                    assert_eq!(num_nulls, 10);
1495                    assert_eq!(num_rows, 10);
1496                    assert_eq!(def_levels_byte_len, 2);
1497                    assert_eq!(rep_levels_byte_len, 0);
1498                    assert!(is_compressed);
1499                    assert!(statistics.is_some());
1500                    true
1501                }
1502                _ => false,
1503            };
1504            assert!(is_expected_page);
1505            page_count += 1;
1506        }
1507        assert_eq!(page_count, 2);
1508    }
1509
1510    #[test]
1511    fn test_file_reader_empty_datapage_v2() {
1512        // this file has 0 bytes compressed datapage that un-compresses to 0 bytes
1513        let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1514        let reader_result = SerializedFileReader::new(test_file);
1515        assert!(reader_result.is_ok());
1516        let reader = reader_result.unwrap();
1517
1518        // Test contents in Parquet metadata
1519        let metadata = reader.metadata();
1520        assert_eq!(metadata.num_row_groups(), 1);
1521
1522        // Test contents in file metadata
1523        let file_metadata = metadata.file_metadata();
1524        assert!(file_metadata.created_by().is_some());
1525        assert_eq!(
1526            file_metadata.created_by().unwrap(),
1527            "parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1528        );
1529        assert!(file_metadata.key_value_metadata().is_some());
1530        assert_eq!(
1531            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1532            2
1533        );
1534
1535        assert_eq!(file_metadata.num_rows(), 1);
1536        assert_eq!(file_metadata.version(), 1);
1537        let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1538        assert_eq!(
1539            file_metadata.column_orders(),
1540            Some(vec![expected_order].as_ref())
1541        );
1542
1543        let row_group_metadata = metadata.row_group(0);
1544
1545        // Check each column order
1546        for i in 0..row_group_metadata.num_columns() {
1547            assert_eq!(file_metadata.column_order(i), expected_order);
1548        }
1549
1550        // Test row group reader
1551        let row_group_reader_result = reader.get_row_group(0);
1552        assert!(row_group_reader_result.is_ok());
1553        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1554        assert_eq!(
1555            row_group_reader.num_columns(),
1556            row_group_metadata.num_columns()
1557        );
1558        assert_eq!(
1559            row_group_reader.metadata().total_byte_size(),
1560            row_group_metadata.total_byte_size()
1561        );
1562
1563        // Test page readers
1564        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1565        assert!(page_reader_0_result.is_ok());
1566        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1567        let mut page_count = 0;
1568        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1569            let is_expected_page = match page {
1570                Page::DataPageV2 {
1571                    buf,
1572                    num_values,
1573                    encoding,
1574                    num_nulls,
1575                    num_rows,
1576                    def_levels_byte_len,
1577                    rep_levels_byte_len,
1578                    is_compressed,
1579                    statistics,
1580                } => {
1581                    assert_eq!(buf.len(), 2);
1582                    assert_eq!(num_values, 1);
1583                    assert_eq!(encoding, Encoding::PLAIN);
1584                    assert_eq!(num_nulls, 1);
1585                    assert_eq!(num_rows, 1);
1586                    assert_eq!(def_levels_byte_len, 2);
1587                    assert_eq!(rep_levels_byte_len, 0);
1588                    assert!(is_compressed);
1589                    assert!(statistics.is_none());
1590                    true
1591                }
1592                _ => false,
1593            };
1594            assert!(is_expected_page);
1595            page_count += 1;
1596        }
1597        assert_eq!(page_count, 1);
1598    }
1599
1600    fn get_serialized_page_reader<R: ChunkReader>(
1601        file_reader: &SerializedFileReader<R>,
1602        row_group: usize,
1603        column: usize,
1604    ) -> Result<SerializedPageReader<R>> {
1605        let row_group = {
1606            let row_group_metadata = file_reader.metadata.row_group(row_group);
1607            let props = Arc::clone(&file_reader.props);
1608            let f = Arc::clone(&file_reader.chunk_reader);
1609            SerializedRowGroupReader::new(
1610                f,
1611                row_group_metadata,
1612                file_reader
1613                    .metadata
1614                    .offset_index()
1615                    .map(|x| x[row_group].as_slice()),
1616                props,
1617            )?
1618        };
1619
1620        let col = row_group.metadata.column(column);
1621
1622        let page_locations = row_group
1623            .offset_index
1624            .map(|x| x[column].page_locations.clone());
1625
1626        let props = Arc::clone(&row_group.props);
1627        SerializedPageReader::new_with_properties(
1628            Arc::clone(&row_group.chunk_reader),
1629            col,
1630            usize::try_from(row_group.metadata.num_rows())?,
1631            page_locations,
1632            props,
1633        )
1634    }
1635
1636    #[test]
1637    fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1638        let test_file = get_test_file("alltypes_plain.parquet");
1639        let reader = SerializedFileReader::new(test_file)?;
1640
1641        let mut offset_set = HashSet::new();
1642        let num_row_groups = reader.metadata.num_row_groups();
1643        for row_group in 0..num_row_groups {
1644            let num_columns = reader.metadata.row_group(row_group).num_columns();
1645            for column in 0..num_columns {
1646                let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1647
1648                while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1649                    match &page_reader.state {
1650                        SerializedPageReaderState::Pages {
1651                            page_locations,
1652                            dictionary_page,
1653                            ..
1654                        } => {
1655                            if let Some(page) = dictionary_page {
1656                                assert_eq!(page.offset as u64, page_offset);
1657                            } else if let Some(page) = page_locations.front() {
1658                                assert_eq!(page.offset as u64, page_offset);
1659                            } else {
1660                                unreachable!()
1661                            }
1662                        }
1663                        SerializedPageReaderState::Values {
1664                            offset,
1665                            next_page_header,
1666                            ..
1667                        } => {
1668                            assert!(next_page_header.is_some());
1669                            assert_eq!(*offset, page_offset);
1670                        }
1671                    }
1672                    let page = page_reader.get_next_page()?;
1673                    assert!(page.is_some());
1674                    let newly_inserted = offset_set.insert(page_offset);
1675                    assert!(newly_inserted);
1676                }
1677            }
1678        }
1679
1680        Ok(())
1681    }
1682
1683    #[test]
1684    fn test_page_iterator() {
1685        let file = get_test_file("alltypes_plain.parquet");
1686        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1687
1688        let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1689
1690        // read first page
1691        let page = page_iterator.next();
1692        assert!(page.is_some());
1693        assert!(page.unwrap().is_ok());
1694
1695        // reach end of file
1696        let page = page_iterator.next();
1697        assert!(page.is_none());
1698
1699        let row_group_indices = Box::new(0..1);
1700        let mut page_iterator =
1701            FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1702
1703        // read first page
1704        let page = page_iterator.next();
1705        assert!(page.is_some());
1706        assert!(page.unwrap().is_ok());
1707
1708        // reach end of file
1709        let page = page_iterator.next();
1710        assert!(page.is_none());
1711    }
1712
1713    #[test]
1714    fn test_file_reader_key_value_metadata() {
1715        let file = get_test_file("binary.parquet");
1716        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1717
1718        let metadata = file_reader
1719            .metadata
1720            .file_metadata()
1721            .key_value_metadata()
1722            .unwrap();
1723
1724        assert_eq!(metadata.len(), 3);
1725
1726        assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1727
1728        assert_eq!(metadata[1].key, "writer.model.name");
1729        assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1730
1731        assert_eq!(metadata[2].key, "parquet.proto.class");
1732        assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1733    }
1734
1735    #[test]
1736    fn test_file_reader_optional_metadata() {
1737        // file with optional metadata: bloom filters, encoding stats, column index and offset index.
1738        let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1739        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1740
1741        let row_group_metadata = file_reader.metadata.row_group(0);
1742        let col0_metadata = row_group_metadata.column(0);
1743
1744        // test optional bloom filter offset
1745        assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1746
1747        // test page encoding stats
1748        let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1749
1750        assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1751        assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1752        assert_eq!(page_encoding_stats.count, 1);
1753
1754        // test optional column index offset
1755        assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1756        assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1757
1758        // test optional offset index offset
1759        assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1760        assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1761    }
1762
1763    #[test]
1764    fn test_file_reader_with_no_filter() -> Result<()> {
1765        let test_file = get_test_file("alltypes_plain.parquet");
1766        let origin_reader = SerializedFileReader::new(test_file)?;
1767        // test initial number of row groups
1768        let metadata = origin_reader.metadata();
1769        assert_eq!(metadata.num_row_groups(), 1);
1770        Ok(())
1771    }
1772
1773    #[test]
1774    fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
1775        let test_file = get_test_file("alltypes_plain.parquet");
1776        let read_options = ReadOptionsBuilder::new()
1777            .with_predicate(Box::new(|_, _| false))
1778            .build();
1779        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1780        let metadata = reader.metadata();
1781        assert_eq!(metadata.num_row_groups(), 0);
1782        Ok(())
1783    }
1784
1785    #[test]
1786    fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
1787        let test_file = get_test_file("alltypes_plain.parquet");
1788        let origin_reader = SerializedFileReader::new(test_file)?;
1789        // test initial number of row groups
1790        let metadata = origin_reader.metadata();
1791        assert_eq!(metadata.num_row_groups(), 1);
1792        let mid = get_midpoint_offset(metadata.row_group(0));
1793
1794        let test_file = get_test_file("alltypes_plain.parquet");
1795        let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
1796        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1797        let metadata = reader.metadata();
1798        assert_eq!(metadata.num_row_groups(), 1);
1799
1800        let test_file = get_test_file("alltypes_plain.parquet");
1801        let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
1802        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1803        let metadata = reader.metadata();
1804        assert_eq!(metadata.num_row_groups(), 0);
1805        Ok(())
1806    }
1807
1808    #[test]
1809    fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
1810        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1811        let origin_reader = SerializedFileReader::new(test_file)?;
1812        let metadata = origin_reader.metadata();
1813        let mid = get_midpoint_offset(metadata.row_group(0));
1814
1815        // true, true predicate
1816        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1817        let read_options = ReadOptionsBuilder::new()
1818            .with_page_index()
1819            .with_predicate(Box::new(|_, _| true))
1820            .with_range(mid, mid + 1)
1821            .build();
1822        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1823        let metadata = reader.metadata();
1824        assert_eq!(metadata.num_row_groups(), 1);
1825        assert_eq!(metadata.column_index().unwrap().len(), 1);
1826        assert_eq!(metadata.offset_index().unwrap().len(), 1);
1827
1828        // true, false predicate
1829        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1830        let read_options = ReadOptionsBuilder::new()
1831            .with_page_index()
1832            .with_predicate(Box::new(|_, _| true))
1833            .with_range(0, mid)
1834            .build();
1835        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1836        let metadata = reader.metadata();
1837        assert_eq!(metadata.num_row_groups(), 0);
1838        assert!(metadata.column_index().is_none());
1839        assert!(metadata.offset_index().is_none());
1840
1841        // false, true predicate
1842        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1843        let read_options = ReadOptionsBuilder::new()
1844            .with_page_index()
1845            .with_predicate(Box::new(|_, _| false))
1846            .with_range(mid, mid + 1)
1847            .build();
1848        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1849        let metadata = reader.metadata();
1850        assert_eq!(metadata.num_row_groups(), 0);
1851        assert!(metadata.column_index().is_none());
1852        assert!(metadata.offset_index().is_none());
1853
1854        // false, false predicate
1855        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1856        let read_options = ReadOptionsBuilder::new()
1857            .with_page_index()
1858            .with_predicate(Box::new(|_, _| false))
1859            .with_range(0, mid)
1860            .build();
1861        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1862        let metadata = reader.metadata();
1863        assert_eq!(metadata.num_row_groups(), 0);
1864        assert!(metadata.column_index().is_none());
1865        assert!(metadata.offset_index().is_none());
1866        Ok(())
1867    }
1868
1869    #[test]
1870    fn test_file_reader_invalid_metadata() {
1871        let data = [
1872            255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
1873            80, 65, 82, 49,
1874        ];
1875        let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
1876        assert_eq!(
1877            ret.err().unwrap().to_string(),
1878            "Parquet error: Could not parse metadata: bad data"
1879        );
1880    }
1881
1882    #[test]
1883    // Use java parquet-tools get below pageIndex info
1884    // !```
1885    // parquet-tools column-index ./data_index_bloom_encoding_stats.parquet
1886    // row group 0:
1887    // column index for column String:
1888    // Boundary order: ASCENDING
1889    // page-0  :
1890    // null count                 min                                  max
1891    // 0                          Hello                                today
1892    //
1893    // offset index for column String:
1894    // page-0   :
1895    // offset   compressed size       first row index
1896    // 4               152                     0
1897    ///```
1898    //
1899    fn test_page_index_reader() {
1900        let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
1901        let builder = ReadOptionsBuilder::new();
1902        //enable read page index
1903        let options = builder.with_page_index().build();
1904        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1905        let reader = reader_result.unwrap();
1906
1907        // Test contents in Parquet metadata
1908        let metadata = reader.metadata();
1909        assert_eq!(metadata.num_row_groups(), 1);
1910
1911        let column_index = metadata.column_index().unwrap();
1912
1913        // only one row group
1914        assert_eq!(column_index.len(), 1);
1915        let index = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
1916            index
1917        } else {
1918            unreachable!()
1919        };
1920
1921        assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
1922        let index_in_pages = &index.indexes;
1923
1924        //only one page group
1925        assert_eq!(index_in_pages.len(), 1);
1926
1927        let page0 = &index_in_pages[0];
1928        let min = page0.min.as_ref().unwrap();
1929        let max = page0.max.as_ref().unwrap();
1930        assert_eq!(b"Hello", min.as_bytes());
1931        assert_eq!(b"today", max.as_bytes());
1932
1933        let offset_indexes = metadata.offset_index().unwrap();
1934        // only one row group
1935        assert_eq!(offset_indexes.len(), 1);
1936        let offset_index = &offset_indexes[0];
1937        let page_offset = &offset_index[0].page_locations()[0];
1938
1939        assert_eq!(4, page_offset.offset);
1940        assert_eq!(152, page_offset.compressed_page_size);
1941        assert_eq!(0, page_offset.first_row_index);
1942    }
1943
1944    #[test]
1945    #[allow(deprecated)]
1946    fn test_page_index_reader_out_of_order() {
1947        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1948        let options = ReadOptionsBuilder::new().with_page_index().build();
1949        let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
1950        let metadata = reader.metadata();
1951
1952        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1953        let columns = metadata.row_group(0).columns();
1954        let reversed: Vec<_> = columns.iter().cloned().rev().collect();
1955
1956        let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
1957        let mut b = read_columns_indexes(&test_file, &reversed)
1958            .unwrap()
1959            .unwrap();
1960        b.reverse();
1961        assert_eq!(a, b);
1962
1963        let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
1964        let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
1965        b.reverse();
1966        assert_eq!(a, b);
1967    }
1968
1969    #[test]
1970    fn test_page_index_reader_all_type() {
1971        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1972        let builder = ReadOptionsBuilder::new();
1973        //enable read page index
1974        let options = builder.with_page_index().build();
1975        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1976        let reader = reader_result.unwrap();
1977
1978        // Test contents in Parquet metadata
1979        let metadata = reader.metadata();
1980        assert_eq!(metadata.num_row_groups(), 1);
1981
1982        let column_index = metadata.column_index().unwrap();
1983        let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
1984
1985        // only one row group
1986        assert_eq!(column_index.len(), 1);
1987        let row_group_metadata = metadata.row_group(0);
1988
1989        //col0->id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 7299, num_nulls: 0]
1990        assert!(!&column_index[0][0].is_sorted());
1991        let boundary_order = &column_index[0][0].get_boundary_order();
1992        assert!(boundary_order.is_some());
1993        matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
1994        if let Index::INT32(index) = &column_index[0][0] {
1995            check_native_page_index(
1996                index,
1997                325,
1998                get_row_group_min_max_bytes(row_group_metadata, 0),
1999                BoundaryOrder::UNORDERED,
2000            );
2001            assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
2002        } else {
2003            unreachable!()
2004        };
2005        //col1->bool_col:BOOLEAN UNCOMPRESSED DO:0 FPO:37329 SZ:3022/3022/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: false, max: true, num_nulls: 0]
2006        assert!(&column_index[0][1].is_sorted());
2007        if let Index::BOOLEAN(index) = &column_index[0][1] {
2008            assert_eq!(index.indexes.len(), 82);
2009            assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
2010        } else {
2011            unreachable!()
2012        };
2013        //col2->tinyint_col: INT32 UNCOMPRESSED DO:0 FPO:40351 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2014        assert!(&column_index[0][2].is_sorted());
2015        if let Index::INT32(index) = &column_index[0][2] {
2016            check_native_page_index(
2017                index,
2018                325,
2019                get_row_group_min_max_bytes(row_group_metadata, 2),
2020                BoundaryOrder::ASCENDING,
2021            );
2022            assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
2023        } else {
2024            unreachable!()
2025        };
2026        //col4->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2027        assert!(&column_index[0][3].is_sorted());
2028        if let Index::INT32(index) = &column_index[0][3] {
2029            check_native_page_index(
2030                index,
2031                325,
2032                get_row_group_min_max_bytes(row_group_metadata, 3),
2033                BoundaryOrder::ASCENDING,
2034            );
2035            assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
2036        } else {
2037            unreachable!()
2038        };
2039        //col5->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2040        assert!(&column_index[0][4].is_sorted());
2041        if let Index::INT32(index) = &column_index[0][4] {
2042            check_native_page_index(
2043                index,
2044                325,
2045                get_row_group_min_max_bytes(row_group_metadata, 4),
2046                BoundaryOrder::ASCENDING,
2047            );
2048            assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
2049        } else {
2050            unreachable!()
2051        };
2052        //col6->bigint_col: INT64 UNCOMPRESSED DO:0 FPO:152326 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 90, num_nulls: 0]
2053        assert!(!&column_index[0][5].is_sorted());
2054        if let Index::INT64(index) = &column_index[0][5] {
2055            check_native_page_index(
2056                index,
2057                528,
2058                get_row_group_min_max_bytes(row_group_metadata, 5),
2059                BoundaryOrder::UNORDERED,
2060            );
2061            assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
2062        } else {
2063            unreachable!()
2064        };
2065        //col7->float_col: FLOAT UNCOMPRESSED DO:0 FPO:223924 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 9.9, num_nulls: 0]
2066        assert!(&column_index[0][6].is_sorted());
2067        if let Index::FLOAT(index) = &column_index[0][6] {
2068            check_native_page_index(
2069                index,
2070                325,
2071                get_row_group_min_max_bytes(row_group_metadata, 6),
2072                BoundaryOrder::ASCENDING,
2073            );
2074            assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
2075        } else {
2076            unreachable!()
2077        };
2078        //col8->double_col: DOUBLE UNCOMPRESSED DO:0 FPO:261249 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 90.89999999999999, num_nulls: 0]
2079        assert!(!&column_index[0][7].is_sorted());
2080        if let Index::DOUBLE(index) = &column_index[0][7] {
2081            check_native_page_index(
2082                index,
2083                528,
2084                get_row_group_min_max_bytes(row_group_metadata, 7),
2085                BoundaryOrder::UNORDERED,
2086            );
2087            assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
2088        } else {
2089            unreachable!()
2090        };
2091        //col9->date_string_col: BINARY UNCOMPRESSED DO:0 FPO:332847 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 01/01/09, max: 12/31/10, num_nulls: 0]
2092        assert!(!&column_index[0][8].is_sorted());
2093        if let Index::BYTE_ARRAY(index) = &column_index[0][8] {
2094            check_native_page_index(
2095                index,
2096                974,
2097                get_row_group_min_max_bytes(row_group_metadata, 8),
2098                BoundaryOrder::UNORDERED,
2099            );
2100            assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
2101        } else {
2102            unreachable!()
2103        };
2104        //col10->string_col: BINARY UNCOMPRESSED DO:0 FPO:444795 SZ:45298/45298/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2105        assert!(&column_index[0][9].is_sorted());
2106        if let Index::BYTE_ARRAY(index) = &column_index[0][9] {
2107            check_native_page_index(
2108                index,
2109                352,
2110                get_row_group_min_max_bytes(row_group_metadata, 9),
2111                BoundaryOrder::ASCENDING,
2112            );
2113            assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
2114        } else {
2115            unreachable!()
2116        };
2117        //col11->timestamp_col: INT96 UNCOMPRESSED DO:0 FPO:490093 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[num_nulls: 0, min/max not defined]
2118        //Notice: min_max values for each page for this col not exits.
2119        assert!(!&column_index[0][10].is_sorted());
2120        if let Index::NONE = &column_index[0][10] {
2121            assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
2122        } else {
2123            unreachable!()
2124        };
2125        //col12->year: INT32 UNCOMPRESSED DO:0 FPO:602041 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2009, max: 2010, num_nulls: 0]
2126        assert!(&column_index[0][11].is_sorted());
2127        if let Index::INT32(index) = &column_index[0][11] {
2128            check_native_page_index(
2129                index,
2130                325,
2131                get_row_group_min_max_bytes(row_group_metadata, 11),
2132                BoundaryOrder::ASCENDING,
2133            );
2134            assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
2135        } else {
2136            unreachable!()
2137        };
2138        //col13->month: INT32 UNCOMPRESSED DO:0 FPO:639366 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 12, num_nulls: 0]
2139        assert!(!&column_index[0][12].is_sorted());
2140        if let Index::INT32(index) = &column_index[0][12] {
2141            check_native_page_index(
2142                index,
2143                325,
2144                get_row_group_min_max_bytes(row_group_metadata, 12),
2145                BoundaryOrder::UNORDERED,
2146            );
2147            assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
2148        } else {
2149            unreachable!()
2150        };
2151    }
2152
2153    fn check_native_page_index<T: ParquetValueType>(
2154        row_group_index: &NativeIndex<T>,
2155        page_size: usize,
2156        min_max: (&[u8], &[u8]),
2157        boundary_order: BoundaryOrder,
2158    ) {
2159        assert_eq!(row_group_index.indexes.len(), page_size);
2160        assert_eq!(row_group_index.boundary_order, boundary_order);
2161        row_group_index.indexes.iter().all(|x| {
2162            x.min.as_ref().unwrap() >= &T::try_from_le_slice(min_max.0).unwrap()
2163                && x.max.as_ref().unwrap() <= &T::try_from_le_slice(min_max.1).unwrap()
2164        });
2165    }
2166
2167    fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
2168        let statistics = r.column(col_num).statistics().unwrap();
2169        (
2170            statistics.min_bytes_opt().unwrap_or_default(),
2171            statistics.max_bytes_opt().unwrap_or_default(),
2172        )
2173    }
2174
2175    #[test]
2176    fn test_skip_next_page_with_dictionary_page() {
2177        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2178        let builder = ReadOptionsBuilder::new();
2179        // enable read page index
2180        let options = builder.with_page_index().build();
2181        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2182        let reader = reader_result.unwrap();
2183
2184        let row_group_reader = reader.get_row_group(0).unwrap();
2185
2186        // use 'string_col', Boundary order: UNORDERED, total 352 data pages and 1 dictionary page.
2187        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2188
2189        let mut vec = vec![];
2190
2191        // Step 1: Peek and ensure dictionary page is correctly identified
2192        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2193        assert!(meta.is_dict);
2194
2195        // Step 2: Call skip_next_page to skip the dictionary page
2196        column_page_reader.skip_next_page().unwrap();
2197
2198        // Step 3: Read the next data page after skipping the dictionary page
2199        let page = column_page_reader.get_next_page().unwrap().unwrap();
2200        assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2201
2202        // Step 4: Continue reading remaining data pages and verify correctness
2203        for _i in 0..351 {
2204            // 352 total pages, 1 dictionary page is skipped
2205            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2206            assert!(!meta.is_dict); // Verify no dictionary page here
2207            vec.push(meta);
2208
2209            let page = column_page_reader.get_next_page().unwrap().unwrap();
2210            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2211        }
2212
2213        // Step 5: Check if all pages are read
2214        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2215        assert!(column_page_reader.get_next_page().unwrap().is_none());
2216
2217        // Step 6: Verify the number of data pages read (should be 351 data pages)
2218        assert_eq!(vec.len(), 351);
2219    }
2220
2221    #[test]
2222    fn test_skip_page_with_offset_index() {
2223        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2224        let builder = ReadOptionsBuilder::new();
2225        //enable read page index
2226        let options = builder.with_page_index().build();
2227        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2228        let reader = reader_result.unwrap();
2229
2230        let row_group_reader = reader.get_row_group(0).unwrap();
2231
2232        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
2233        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2234
2235        let mut vec = vec![];
2236
2237        for i in 0..325 {
2238            if i % 2 == 0 {
2239                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2240            } else {
2241                column_page_reader.skip_next_page().unwrap();
2242            }
2243        }
2244        //check read all pages.
2245        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2246        assert!(column_page_reader.get_next_page().unwrap().is_none());
2247
2248        assert_eq!(vec.len(), 163);
2249    }
2250
2251    #[test]
2252    fn test_skip_page_without_offset_index() {
2253        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2254
2255        // use default SerializedFileReader without read offsetIndex
2256        let reader_result = SerializedFileReader::new(test_file);
2257        let reader = reader_result.unwrap();
2258
2259        let row_group_reader = reader.get_row_group(0).unwrap();
2260
2261        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
2262        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2263
2264        let mut vec = vec![];
2265
2266        for i in 0..325 {
2267            if i % 2 == 0 {
2268                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2269            } else {
2270                column_page_reader.peek_next_page().unwrap().unwrap();
2271                column_page_reader.skip_next_page().unwrap();
2272            }
2273        }
2274        //check read all pages.
2275        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2276        assert!(column_page_reader.get_next_page().unwrap().is_none());
2277
2278        assert_eq!(vec.len(), 163);
2279    }
2280
2281    #[test]
2282    fn test_peek_page_with_dictionary_page() {
2283        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2284        let builder = ReadOptionsBuilder::new();
2285        //enable read page index
2286        let options = builder.with_page_index().build();
2287        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2288        let reader = reader_result.unwrap();
2289        let row_group_reader = reader.get_row_group(0).unwrap();
2290
2291        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
2292        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2293
2294        let mut vec = vec![];
2295
2296        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2297        assert!(meta.is_dict);
2298        let page = column_page_reader.get_next_page().unwrap().unwrap();
2299        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2300
2301        for i in 0..352 {
2302            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2303            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
2304            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
2305            if i != 351 {
2306                assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
2307            } else {
2308                // last page first row index is 7290, total row count is 7300
2309                // because first row start with zero, last page row count should be 10.
2310                assert_eq!(meta.num_rows, Some(10));
2311            }
2312            assert!(!meta.is_dict);
2313            vec.push(meta);
2314            let page = column_page_reader.get_next_page().unwrap().unwrap();
2315            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2316        }
2317
2318        //check read all pages.
2319        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2320        assert!(column_page_reader.get_next_page().unwrap().is_none());
2321
2322        assert_eq!(vec.len(), 352);
2323    }
2324
2325    #[test]
2326    fn test_peek_page_with_dictionary_page_without_offset_index() {
2327        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2328
2329        let reader_result = SerializedFileReader::new(test_file);
2330        let reader = reader_result.unwrap();
2331        let row_group_reader = reader.get_row_group(0).unwrap();
2332
2333        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
2334        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2335
2336        let mut vec = vec![];
2337
2338        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2339        assert!(meta.is_dict);
2340        let page = column_page_reader.get_next_page().unwrap().unwrap();
2341        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2342
2343        for i in 0..352 {
2344            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2345            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
2346            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
2347            if i != 351 {
2348                assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2349            } else {
2350                // last page first row index is 7290, total row count is 7300
2351                // because first row start with zero, last page row count should be 10.
2352                assert_eq!(meta.num_levels, Some(10));
2353            }
2354            assert!(!meta.is_dict);
2355            vec.push(meta);
2356            let page = column_page_reader.get_next_page().unwrap().unwrap();
2357            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2358        }
2359
2360        //check read all pages.
2361        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2362        assert!(column_page_reader.get_next_page().unwrap().is_none());
2363
2364        assert_eq!(vec.len(), 352);
2365    }
2366
2367    #[test]
2368    fn test_fixed_length_index() {
2369        let message_type = "
2370        message test_schema {
2371          OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2372        }
2373        ";
2374
2375        let schema = parse_message_type(message_type).unwrap();
2376        let mut out = Vec::with_capacity(1024);
2377        let mut writer =
2378            SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2379
2380        let mut r = writer.next_row_group().unwrap();
2381        let mut c = r.next_column().unwrap().unwrap();
2382        c.typed::<FixedLenByteArrayType>()
2383            .write_batch(
2384                &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2385                Some(&[1, 1, 0, 1]),
2386                None,
2387            )
2388            .unwrap();
2389        c.close().unwrap();
2390        r.close().unwrap();
2391        writer.close().unwrap();
2392
2393        let b = Bytes::from(out);
2394        let options = ReadOptionsBuilder::new().with_page_index().build();
2395        let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2396        let index = reader.metadata().column_index().unwrap();
2397
2398        // 1 row group
2399        assert_eq!(index.len(), 1);
2400        let c = &index[0];
2401        // 1 column
2402        assert_eq!(c.len(), 1);
2403
2404        match &c[0] {
2405            Index::FIXED_LEN_BYTE_ARRAY(v) => {
2406                assert_eq!(v.indexes.len(), 1);
2407                let page_idx = &v.indexes[0];
2408                assert_eq!(page_idx.null_count.unwrap(), 1);
2409                assert_eq!(page_idx.min.as_ref().unwrap().as_ref(), &[0; 11]);
2410                assert_eq!(page_idx.max.as_ref().unwrap().as_ref(), &[5; 11]);
2411            }
2412            _ => unreachable!(),
2413        }
2414    }
2415
2416    #[test]
2417    fn test_multi_gz() {
2418        let file = get_test_file("concatenated_gzip_members.parquet");
2419        let reader = SerializedFileReader::new(file).unwrap();
2420        let row_group_reader = reader.get_row_group(0).unwrap();
2421        match row_group_reader.get_column_reader(0).unwrap() {
2422            ColumnReader::Int64ColumnReader(mut reader) => {
2423                let mut buffer = Vec::with_capacity(1024);
2424                let mut def_levels = Vec::with_capacity(1024);
2425                let (num_records, num_values, num_levels) = reader
2426                    .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2427                    .unwrap();
2428
2429                assert_eq!(num_records, 513);
2430                assert_eq!(num_values, 513);
2431                assert_eq!(num_levels, 513);
2432
2433                let expected: Vec<i64> = (1..514).collect();
2434                assert_eq!(&buffer, &expected);
2435            }
2436            _ => unreachable!(),
2437        }
2438    }
2439
2440    #[test]
2441    fn test_byte_stream_split_extended() {
2442        let path = format!(
2443            "{}/byte_stream_split_extended.gzip.parquet",
2444            arrow::util::test_util::parquet_test_data(),
2445        );
2446        let file = File::open(path).unwrap();
2447        let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2448
2449        // Use full schema as projected schema
2450        let mut iter = reader
2451            .get_row_iter(None)
2452            .expect("Failed to create row iterator");
2453
2454        let mut start = 0;
2455        let end = reader.metadata().file_metadata().num_rows();
2456
2457        let check_row = |row: Result<Row, ParquetError>| {
2458            assert!(row.is_ok());
2459            let r = row.unwrap();
2460            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2461            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2462            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2463            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2464            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2465            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2466            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2467        };
2468
2469        while start < end {
2470            match iter.next() {
2471                Some(row) => check_row(row),
2472                None => break,
2473            };
2474            start += 1;
2475        }
2476    }
2477
2478    #[test]
2479    fn test_filtered_rowgroup_metadata() {
2480        let message_type = "
2481            message test_schema {
2482                REQUIRED INT32 a;
2483            }
2484        ";
2485        let schema = Arc::new(parse_message_type(message_type).unwrap());
2486        let props = Arc::new(
2487            WriterProperties::builder()
2488                .set_statistics_enabled(EnabledStatistics::Page)
2489                .build(),
2490        );
2491        let mut file: File = tempfile::tempfile().unwrap();
2492        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2493        let data = [1, 2, 3, 4, 5];
2494
2495        // write 5 row groups
2496        for idx in 0..5 {
2497            let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2498            let mut row_group_writer = file_writer.next_row_group().unwrap();
2499            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2500                writer
2501                    .typed::<Int32Type>()
2502                    .write_batch(data_i.as_slice(), None, None)
2503                    .unwrap();
2504                writer.close().unwrap();
2505            }
2506            row_group_writer.close().unwrap();
2507            file_writer.flushed_row_groups();
2508        }
2509        let file_metadata = file_writer.close().unwrap();
2510
2511        assert_eq!(file_metadata.num_rows, 25);
2512        assert_eq!(file_metadata.row_groups.len(), 5);
2513
2514        // read only the 3rd row group
2515        let read_options = ReadOptionsBuilder::new()
2516            .with_page_index()
2517            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2518            .build();
2519        let reader =
2520            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2521                .unwrap();
2522        let metadata = reader.metadata();
2523
2524        // check we got the expected row group
2525        assert_eq!(metadata.num_row_groups(), 1);
2526        assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2527
2528        // check we only got the relevant page indexes
2529        assert!(metadata.column_index().is_some());
2530        assert!(metadata.offset_index().is_some());
2531        assert_eq!(metadata.column_index().unwrap().len(), 1);
2532        assert_eq!(metadata.offset_index().unwrap().len(), 1);
2533        let col_idx = metadata.column_index().unwrap();
2534        let off_idx = metadata.offset_index().unwrap();
2535        let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2536        let pg_idx = &col_idx[0][0];
2537        let off_idx_i = &off_idx[0][0];
2538
2539        // test that we got the index matching the row group
2540        match pg_idx {
2541            Index::INT32(int_idx) => {
2542                let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2543                let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2544                assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2545                assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2546            }
2547            _ => panic!("wrong stats type"),
2548        }
2549
2550        // check offset index matches too
2551        assert_eq!(
2552            off_idx_i.page_locations[0].offset,
2553            metadata.row_group(0).column(0).data_page_offset()
2554        );
2555
2556        // read non-contiguous row groups
2557        let read_options = ReadOptionsBuilder::new()
2558            .with_page_index()
2559            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2560            .build();
2561        let reader =
2562            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2563                .unwrap();
2564        let metadata = reader.metadata();
2565
2566        // check we got the expected row groups
2567        assert_eq!(metadata.num_row_groups(), 2);
2568        assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2569        assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2570
2571        // check we only got the relevant page indexes
2572        assert!(metadata.column_index().is_some());
2573        assert!(metadata.offset_index().is_some());
2574        assert_eq!(metadata.column_index().unwrap().len(), 2);
2575        assert_eq!(metadata.offset_index().unwrap().len(), 2);
2576        let col_idx = metadata.column_index().unwrap();
2577        let off_idx = metadata.offset_index().unwrap();
2578
2579        for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2580            let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2581            let pg_idx = &col_idx_i[0];
2582            let off_idx_i = &off_idx[i][0];
2583
2584            // test that we got the index matching the row group
2585            match pg_idx {
2586                Index::INT32(int_idx) => {
2587                    let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2588                    let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2589                    assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2590                    assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2591                }
2592                _ => panic!("wrong stats type"),
2593            }
2594
2595            // check offset index matches too
2596            assert_eq!(
2597                off_idx_i.page_locations[0].offset,
2598                metadata.row_group(i).column(0).data_page_offset()
2599            );
2600        }
2601    }
2602}