parquet/file/
serialized_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader
19//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM)
20
21use crate::basic::{Encoding, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{create_codec, Codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{read_and_decrypt, CryptoContext};
27use crate::errors::{ParquetError, Result};
28use crate::file::page_index::offset_index::OffsetIndexMetaData;
29use crate::file::{
30    metadata::*,
31    properties::{ReaderProperties, ReaderPropertiesPtr},
32    reader::*,
33    statistics,
34};
35use crate::format::{PageHeader, PageLocation, PageType};
36use crate::record::reader::RowIter;
37use crate::record::Row;
38use crate::schema::types::Type as SchemaType;
39#[cfg(feature = "encryption")]
40use crate::thrift::TCompactSliceInputProtocol;
41use crate::thrift::TSerializable;
42use bytes::Bytes;
43use std::collections::VecDeque;
44use std::{fs::File, io::Read, path::Path, sync::Arc};
45use thrift::protocol::TCompactInputProtocol;
46
47impl TryFrom<File> for SerializedFileReader<File> {
48    type Error = ParquetError;
49
50    fn try_from(file: File) -> Result<Self> {
51        Self::new(file)
52    }
53}
54
55impl TryFrom<&Path> for SerializedFileReader<File> {
56    type Error = ParquetError;
57
58    fn try_from(path: &Path) -> Result<Self> {
59        let file = File::open(path)?;
60        Self::try_from(file)
61    }
62}
63
64impl TryFrom<String> for SerializedFileReader<File> {
65    type Error = ParquetError;
66
67    fn try_from(path: String) -> Result<Self> {
68        Self::try_from(Path::new(&path))
69    }
70}
71
72impl TryFrom<&str> for SerializedFileReader<File> {
73    type Error = ParquetError;
74
75    fn try_from(path: &str) -> Result<Self> {
76        Self::try_from(Path::new(&path))
77    }
78}
79
80/// Conversion into a [`RowIter`]
81/// using the full file schema over all row groups.
82impl IntoIterator for SerializedFileReader<File> {
83    type Item = Result<Row>;
84    type IntoIter = RowIter<'static>;
85
86    fn into_iter(self) -> Self::IntoIter {
87        RowIter::from_file_into(Box::new(self))
88    }
89}
90
91// ----------------------------------------------------------------------
92// Implementations of file & row group readers
93
94/// A serialized implementation for Parquet [`FileReader`].
95pub struct SerializedFileReader<R: ChunkReader> {
96    chunk_reader: Arc<R>,
97    metadata: Arc<ParquetMetaData>,
98    props: ReaderPropertiesPtr,
99}
100
101/// A predicate for filtering row groups, invoked with the metadata and index
102/// of each row group in the file. Only row groups for which the predicate
103/// evaluates to `true` will be scanned
104pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
105
106/// A builder for [`ReadOptions`].
107/// For the predicates that are added to the builder,
108/// they will be chained using 'AND' to filter the row groups.
109#[derive(Default)]
110pub struct ReadOptionsBuilder {
111    predicates: Vec<ReadGroupPredicate>,
112    enable_page_index: bool,
113    props: Option<ReaderProperties>,
114}
115
116impl ReadOptionsBuilder {
117    /// New builder
118    pub fn new() -> Self {
119        Self::default()
120    }
121
122    /// Add a predicate on row group metadata to the reading option,
123    /// Filter only row groups that match the predicate criteria
124    pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
125        self.predicates.push(predicate);
126        self
127    }
128
129    /// Add a range predicate on filtering row groups if their midpoints are within
130    /// the Closed-Open range `[start..end) {x | start <= x < end}`
131    pub fn with_range(mut self, start: i64, end: i64) -> Self {
132        assert!(start < end);
133        let predicate = move |rg: &RowGroupMetaData, _: usize| {
134            let mid = get_midpoint_offset(rg);
135            mid >= start && mid < end
136        };
137        self.predicates.push(Box::new(predicate));
138        self
139    }
140
141    /// Enable reading the page index structures described in
142    /// "[Column Index] Layout to Support Page Skipping"
143    ///
144    /// [Column Index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
145    pub fn with_page_index(mut self) -> Self {
146        self.enable_page_index = true;
147        self
148    }
149
150    /// Set the [`ReaderProperties`] configuration.
151    pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
152        self.props = Some(properties);
153        self
154    }
155
156    /// Seal the builder and return the read options
157    pub fn build(self) -> ReadOptions {
158        let props = self
159            .props
160            .unwrap_or_else(|| ReaderProperties::builder().build());
161        ReadOptions {
162            predicates: self.predicates,
163            enable_page_index: self.enable_page_index,
164            props,
165        }
166    }
167}
168
169/// A collection of options for reading a Parquet file.
170///
171/// Currently, only predicates on row group metadata are supported.
172/// All predicates will be chained using 'AND' to filter the row groups.
173pub struct ReadOptions {
174    predicates: Vec<ReadGroupPredicate>,
175    enable_page_index: bool,
176    props: ReaderProperties,
177}
178
179impl<R: 'static + ChunkReader> SerializedFileReader<R> {
180    /// Creates file reader from a Parquet file.
181    /// Returns an error if the Parquet file does not exist or is corrupt.
182    pub fn new(chunk_reader: R) -> Result<Self> {
183        let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
184        let props = Arc::new(ReaderProperties::builder().build());
185        Ok(Self {
186            chunk_reader: Arc::new(chunk_reader),
187            metadata: Arc::new(metadata),
188            props,
189        })
190    }
191
192    /// Creates file reader from a Parquet file with read options.
193    /// Returns an error if the Parquet file does not exist or is corrupt.
194    #[allow(deprecated)]
195    pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
196        let mut metadata_builder = ParquetMetaDataReader::new()
197            .parse_and_finish(&chunk_reader)?
198            .into_builder();
199        let mut predicates = options.predicates;
200
201        // Filter row groups based on the predicates
202        for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
203            let mut keep = true;
204            for predicate in &mut predicates {
205                if !predicate(&rg_meta, i) {
206                    keep = false;
207                    break;
208                }
209            }
210            if keep {
211                metadata_builder = metadata_builder.add_row_group(rg_meta);
212            }
213        }
214
215        let mut metadata = metadata_builder.build();
216
217        // If page indexes are desired, build them with the filtered set of row groups
218        if options.enable_page_index {
219            let mut reader =
220                ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
221            reader.read_page_indexes(&chunk_reader)?;
222            metadata = reader.finish()?;
223        }
224
225        Ok(Self {
226            chunk_reader: Arc::new(chunk_reader),
227            metadata: Arc::new(metadata),
228            props: Arc::new(options.props),
229        })
230    }
231}
232
233/// Get midpoint offset for a row group
234fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
235    let col = meta.column(0);
236    let mut offset = col.data_page_offset();
237    if let Some(dic_offset) = col.dictionary_page_offset() {
238        if offset > dic_offset {
239            offset = dic_offset
240        }
241    };
242    offset + meta.compressed_size() / 2
243}
244
245impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
246    fn metadata(&self) -> &ParquetMetaData {
247        &self.metadata
248    }
249
250    fn num_row_groups(&self) -> usize {
251        self.metadata.num_row_groups()
252    }
253
254    fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
255        let row_group_metadata = self.metadata.row_group(i);
256        // Row groups should be processed sequentially.
257        let props = Arc::clone(&self.props);
258        let f = Arc::clone(&self.chunk_reader);
259        Ok(Box::new(SerializedRowGroupReader::new(
260            f,
261            row_group_metadata,
262            self.metadata.offset_index().map(|x| x[i].as_slice()),
263            props,
264        )?))
265    }
266
267    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
268        RowIter::from_file(projection, self)
269    }
270}
271
272/// A serialized implementation for Parquet [`RowGroupReader`].
273pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
274    chunk_reader: Arc<R>,
275    metadata: &'a RowGroupMetaData,
276    offset_index: Option<&'a [OffsetIndexMetaData]>,
277    props: ReaderPropertiesPtr,
278    bloom_filters: Vec<Option<Sbbf>>,
279}
280
281impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
282    /// Creates new row group reader from a file, row group metadata and custom config.
283    pub fn new(
284        chunk_reader: Arc<R>,
285        metadata: &'a RowGroupMetaData,
286        offset_index: Option<&'a [OffsetIndexMetaData]>,
287        props: ReaderPropertiesPtr,
288    ) -> Result<Self> {
289        let bloom_filters = if props.read_bloom_filter() {
290            metadata
291                .columns()
292                .iter()
293                .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
294                .collect::<Result<Vec<_>>>()?
295        } else {
296            std::iter::repeat_n(None, metadata.columns().len()).collect()
297        };
298        Ok(Self {
299            chunk_reader,
300            metadata,
301            offset_index,
302            props,
303            bloom_filters,
304        })
305    }
306}
307
308impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
309    fn metadata(&self) -> &RowGroupMetaData {
310        self.metadata
311    }
312
313    fn num_columns(&self) -> usize {
314        self.metadata.num_columns()
315    }
316
317    // TODO: fix PARQUET-816
318    fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
319        let col = self.metadata.column(i);
320
321        let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
322
323        let props = Arc::clone(&self.props);
324        Ok(Box::new(SerializedPageReader::new_with_properties(
325            Arc::clone(&self.chunk_reader),
326            col,
327            usize::try_from(self.metadata.num_rows())?,
328            page_locations,
329            props,
330        )?))
331    }
332
333    /// get bloom filter for the `i`th column
334    fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
335        self.bloom_filters[i].as_ref()
336    }
337
338    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
339        RowIter::from_row_group(projection, self)
340    }
341}
342
343/// Decodes a [`Page`] from the provided `buffer`
344pub(crate) fn decode_page(
345    page_header: PageHeader,
346    buffer: Bytes,
347    physical_type: Type,
348    decompressor: Option<&mut Box<dyn Codec>>,
349) -> Result<Page> {
350    // Verify the 32-bit CRC checksum of the page
351    #[cfg(feature = "crc")]
352    if let Some(expected_crc) = page_header.crc {
353        let crc = crc32fast::hash(&buffer);
354        if crc != expected_crc as u32 {
355            return Err(general_err!("Page CRC checksum mismatch"));
356        }
357    }
358
359    // When processing data page v2, depending on enabled compression for the
360    // page, we should account for uncompressed data ('offset') of
361    // repetition and definition levels.
362    //
363    // We always use 0 offset for other pages other than v2, `true` flag means
364    // that compression will be applied if decompressor is defined
365    let mut offset: usize = 0;
366    let mut can_decompress = true;
367
368    if let Some(ref header_v2) = page_header.data_page_header_v2 {
369        if header_v2.definition_levels_byte_length < 0
370            || header_v2.repetition_levels_byte_length < 0
371            || header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
372                > page_header.uncompressed_page_size
373        {
374            return Err(general_err!(
375                "DataPage v2 header contains implausible values \
376                    for definition_levels_byte_length ({}) \
377                    and repetition_levels_byte_length ({}) \
378                    given DataPage header provides uncompressed_page_size ({})",
379                header_v2.definition_levels_byte_length,
380                header_v2.repetition_levels_byte_length,
381                page_header.uncompressed_page_size
382            ));
383        }
384        offset = usize::try_from(
385            header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
386        )?;
387        // When is_compressed flag is missing the page is considered compressed
388        can_decompress = header_v2.is_compressed.unwrap_or(true);
389    }
390
391    // TODO: page header could be huge because of statistics. We should set a
392    // maximum page header size and abort if that is exceeded.
393    let buffer = match decompressor {
394        Some(decompressor) if can_decompress => {
395            let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
396            let decompressed_size = uncompressed_page_size - offset;
397            let mut decompressed = Vec::with_capacity(uncompressed_page_size);
398            decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
399            if decompressed_size > 0 {
400                let compressed = &buffer.as_ref()[offset..];
401                decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
402            }
403
404            if decompressed.len() != uncompressed_page_size {
405                return Err(general_err!(
406                    "Actual decompressed size doesn't match the expected one ({} vs {})",
407                    decompressed.len(),
408                    uncompressed_page_size
409                ));
410            }
411
412            Bytes::from(decompressed)
413        }
414        _ => buffer,
415    };
416
417    let result = match page_header.type_ {
418        PageType::DICTIONARY_PAGE => {
419            let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
420                ParquetError::General("Missing dictionary page header".to_string())
421            })?;
422            let is_sorted = dict_header.is_sorted.unwrap_or(false);
423            Page::DictionaryPage {
424                buf: buffer,
425                num_values: dict_header.num_values.try_into()?,
426                encoding: Encoding::try_from(dict_header.encoding)?,
427                is_sorted,
428            }
429        }
430        PageType::DATA_PAGE => {
431            let header = page_header
432                .data_page_header
433                .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
434            Page::DataPage {
435                buf: buffer,
436                num_values: header.num_values.try_into()?,
437                encoding: Encoding::try_from(header.encoding)?,
438                def_level_encoding: Encoding::try_from(header.definition_level_encoding)?,
439                rep_level_encoding: Encoding::try_from(header.repetition_level_encoding)?,
440                statistics: statistics::from_thrift(physical_type, header.statistics)?,
441            }
442        }
443        PageType::DATA_PAGE_V2 => {
444            let header = page_header
445                .data_page_header_v2
446                .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
447            let is_compressed = header.is_compressed.unwrap_or(true);
448            Page::DataPageV2 {
449                buf: buffer,
450                num_values: header.num_values.try_into()?,
451                encoding: Encoding::try_from(header.encoding)?,
452                num_nulls: header.num_nulls.try_into()?,
453                num_rows: header.num_rows.try_into()?,
454                def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
455                rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
456                is_compressed,
457                statistics: statistics::from_thrift(physical_type, header.statistics)?,
458            }
459        }
460        _ => {
461            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
462            unimplemented!("Page type {:?} is not supported", page_header.type_)
463        }
464    };
465
466    Ok(result)
467}
468
469enum SerializedPageReaderState {
470    Values {
471        /// The current byte offset in the reader
472        /// Note that offset is u64 (i.e., not usize) to support 32-bit architectures such as WASM
473        offset: u64,
474
475        /// The length of the chunk in bytes
476        /// Note that remaining_bytes is u64 (i.e., not usize) to support 32-bit architectures such as WASM
477        remaining_bytes: u64,
478
479        // If the next page header has already been "peeked", we will cache it and it`s length here
480        next_page_header: Option<Box<PageHeader>>,
481
482        /// The index of the data page within this column chunk
483        page_index: usize,
484
485        /// Whether the next page is expected to be a dictionary page
486        require_dictionary: bool,
487    },
488    Pages {
489        /// Remaining page locations
490        page_locations: VecDeque<PageLocation>,
491        /// Remaining dictionary location if any
492        dictionary_page: Option<PageLocation>,
493        /// The total number of rows in this column chunk
494        total_rows: usize,
495        /// The index of the data page within this column chunk
496        page_index: usize,
497    },
498}
499
500#[derive(Default)]
501struct SerializedPageReaderContext {
502    /// Crypto context carrying objects required for decryption
503    #[cfg(feature = "encryption")]
504    crypto_context: Option<Arc<CryptoContext>>,
505}
506
507/// A serialized implementation for Parquet [`PageReader`].
508pub struct SerializedPageReader<R: ChunkReader> {
509    /// The chunk reader
510    reader: Arc<R>,
511
512    /// The compression codec for this column chunk. Only set for non-PLAIN codec.
513    decompressor: Option<Box<dyn Codec>>,
514
515    /// Column chunk type.
516    physical_type: Type,
517
518    state: SerializedPageReaderState,
519
520    context: SerializedPageReaderContext,
521}
522
523impl<R: ChunkReader> SerializedPageReader<R> {
524    /// Creates a new serialized page reader from a chunk reader and metadata
525    pub fn new(
526        reader: Arc<R>,
527        column_chunk_metadata: &ColumnChunkMetaData,
528        total_rows: usize,
529        page_locations: Option<Vec<PageLocation>>,
530    ) -> Result<Self> {
531        let props = Arc::new(ReaderProperties::builder().build());
532        SerializedPageReader::new_with_properties(
533            reader,
534            column_chunk_metadata,
535            total_rows,
536            page_locations,
537            props,
538        )
539    }
540
541    /// Stub No-op implementation when encryption is disabled.
542    #[cfg(all(feature = "arrow", not(feature = "encryption")))]
543    pub(crate) fn add_crypto_context(
544        self,
545        _rg_idx: usize,
546        _column_idx: usize,
547        _parquet_meta_data: &ParquetMetaData,
548        _column_chunk_metadata: &ColumnChunkMetaData,
549    ) -> Result<SerializedPageReader<R>> {
550        Ok(self)
551    }
552
553    /// Adds any necessary crypto context to this page reader, if encryption is enabled.
554    #[cfg(feature = "encryption")]
555    pub(crate) fn add_crypto_context(
556        mut self,
557        rg_idx: usize,
558        column_idx: usize,
559        parquet_meta_data: &ParquetMetaData,
560        column_chunk_metadata: &ColumnChunkMetaData,
561    ) -> Result<SerializedPageReader<R>> {
562        let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
563            return Ok(self);
564        };
565        let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
566            return Ok(self);
567        };
568        let crypto_context =
569            CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
570        self.context.crypto_context = Some(Arc::new(crypto_context));
571        Ok(self)
572    }
573
574    /// Creates a new serialized page with custom options.
575    pub fn new_with_properties(
576        reader: Arc<R>,
577        meta: &ColumnChunkMetaData,
578        total_rows: usize,
579        page_locations: Option<Vec<PageLocation>>,
580        props: ReaderPropertiesPtr,
581    ) -> Result<Self> {
582        let decompressor = create_codec(meta.compression(), props.codec_options())?;
583        let (start, len) = meta.byte_range();
584
585        let state = match page_locations {
586            Some(locations) => {
587                // If the offset of the first page doesn't match the start of the column chunk
588                // then the preceding space must contain a dictionary page.
589                let dictionary_page = match locations.first() {
590                    Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
591                        offset: start as i64,
592                        compressed_page_size: (dict_offset.offset as u64 - start) as i32,
593                        first_row_index: 0,
594                    }),
595                    _ => None,
596                };
597
598                SerializedPageReaderState::Pages {
599                    page_locations: locations.into(),
600                    dictionary_page,
601                    total_rows,
602                    page_index: 0,
603                }
604            }
605            None => SerializedPageReaderState::Values {
606                offset: start,
607                remaining_bytes: len,
608                next_page_header: None,
609                page_index: 0,
610                require_dictionary: meta.dictionary_page_offset().is_some(),
611            },
612        };
613        Ok(Self {
614            reader,
615            decompressor,
616            state,
617            physical_type: meta.column_type(),
618            context: Default::default(),
619        })
620    }
621
622    /// Similar to `peek_next_page`, but returns the offset of the next page instead of the page metadata.
623    /// Unlike page metadata, an offset can uniquely identify a page.
624    ///
625    /// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice.
626    /// This function allows us to check if the next page is being cached or read previously.
627    #[cfg(test)]
628    fn peek_next_page_offset(&mut self) -> Result<Option<u64>> {
629        match &mut self.state {
630            SerializedPageReaderState::Values {
631                offset,
632                remaining_bytes,
633                next_page_header,
634                page_index,
635                require_dictionary,
636            } => {
637                loop {
638                    if *remaining_bytes == 0 {
639                        return Ok(None);
640                    }
641                    return if let Some(header) = next_page_header.as_ref() {
642                        if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
643                            Ok(Some(*offset))
644                        } else {
645                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
646                            *next_page_header = None;
647                            continue;
648                        }
649                    } else {
650                        let mut read = self.reader.get_read(*offset)?;
651                        let (header_len, header) = Self::read_page_header_len(
652                            &self.context,
653                            &mut read,
654                            *page_index,
655                            *require_dictionary,
656                        )?;
657                        *offset += header_len as u64;
658                        *remaining_bytes -= header_len as u64;
659                        let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
660                            Ok(Some(*offset))
661                        } else {
662                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
663                            continue;
664                        };
665                        *next_page_header = Some(Box::new(header));
666                        page_meta
667                    };
668                }
669            }
670            SerializedPageReaderState::Pages {
671                page_locations,
672                dictionary_page,
673                ..
674            } => {
675                if let Some(page) = dictionary_page {
676                    Ok(Some(page.offset as u64))
677                } else if let Some(page) = page_locations.front() {
678                    Ok(Some(page.offset as u64))
679                } else {
680                    Ok(None)
681                }
682            }
683        }
684    }
685
686    fn read_page_header_len<T: Read>(
687        context: &SerializedPageReaderContext,
688        input: &mut T,
689        page_index: usize,
690        dictionary_page: bool,
691    ) -> Result<(usize, PageHeader)> {
692        /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
693        struct TrackedRead<R> {
694            inner: R,
695            bytes_read: usize,
696        }
697
698        impl<R: Read> Read for TrackedRead<R> {
699            fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
700                let v = self.inner.read(buf)?;
701                self.bytes_read += v;
702                Ok(v)
703            }
704        }
705
706        let mut tracked = TrackedRead {
707            inner: input,
708            bytes_read: 0,
709        };
710        let header = context.read_page_header(&mut tracked, page_index, dictionary_page)?;
711        Ok((tracked.bytes_read, header))
712    }
713
714    fn read_page_header_len_from_bytes(
715        context: &SerializedPageReaderContext,
716        buffer: &[u8],
717        page_index: usize,
718        dictionary_page: bool,
719    ) -> Result<(usize, PageHeader)> {
720        let mut input = std::io::Cursor::new(buffer);
721        let header = context.read_page_header(&mut input, page_index, dictionary_page)?;
722        let header_len = input.position() as usize;
723        Ok((header_len, header))
724    }
725}
726
727#[cfg(not(feature = "encryption"))]
728impl SerializedPageReaderContext {
729    fn read_page_header<T: Read>(
730        &self,
731        input: &mut T,
732        _page_index: usize,
733        _dictionary_page: bool,
734    ) -> Result<PageHeader> {
735        let mut prot = TCompactInputProtocol::new(input);
736        Ok(PageHeader::read_from_in_protocol(&mut prot)?)
737    }
738
739    fn decrypt_page_data<T>(
740        &self,
741        buffer: T,
742        _page_index: usize,
743        _dictionary_page: bool,
744    ) -> Result<T> {
745        Ok(buffer)
746    }
747}
748
749#[cfg(feature = "encryption")]
750impl SerializedPageReaderContext {
751    fn read_page_header<T: Read>(
752        &self,
753        input: &mut T,
754        page_index: usize,
755        dictionary_page: bool,
756    ) -> Result<PageHeader> {
757        match self.page_crypto_context(page_index, dictionary_page) {
758            None => {
759                let mut prot = TCompactInputProtocol::new(input);
760                Ok(PageHeader::read_from_in_protocol(&mut prot)?)
761            }
762            Some(page_crypto_context) => {
763                let data_decryptor = page_crypto_context.data_decryptor();
764                let aad = page_crypto_context.create_page_header_aad()?;
765
766                let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
767                    ParquetError::General(format!(
768                        "Error decrypting page header for column {}, decryption key may be wrong",
769                        page_crypto_context.column_ordinal
770                    ))
771                })?;
772
773                let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
774                Ok(PageHeader::read_from_in_protocol(&mut prot)?)
775            }
776        }
777    }
778
779    fn decrypt_page_data<T>(&self, buffer: T, page_index: usize, dictionary_page: bool) -> Result<T>
780    where
781        T: AsRef<[u8]>,
782        T: From<Vec<u8>>,
783    {
784        let page_crypto_context = self.page_crypto_context(page_index, dictionary_page);
785        if let Some(page_crypto_context) = page_crypto_context {
786            let decryptor = page_crypto_context.data_decryptor();
787            let aad = page_crypto_context.create_page_aad()?;
788            let decrypted = decryptor.decrypt(buffer.as_ref(), &aad)?;
789            Ok(T::from(decrypted))
790        } else {
791            Ok(buffer)
792        }
793    }
794
795    fn page_crypto_context(
796        &self,
797        page_index: usize,
798        dictionary_page: bool,
799    ) -> Option<Arc<CryptoContext>> {
800        self.crypto_context.as_ref().map(|c| {
801            Arc::new(if dictionary_page {
802                c.for_dictionary_page()
803            } else {
804                c.with_page_ordinal(page_index)
805            })
806        })
807    }
808}
809
810impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
811    type Item = Result<Page>;
812
813    fn next(&mut self) -> Option<Self::Item> {
814        self.get_next_page().transpose()
815    }
816}
817
818fn verify_page_header_len(header_len: usize, remaining_bytes: u64) -> Result<()> {
819    if header_len as u64 > remaining_bytes {
820        return Err(eof_err!("Invalid page header"));
821    }
822    Ok(())
823}
824
825fn verify_page_size(
826    compressed_size: i32,
827    uncompressed_size: i32,
828    remaining_bytes: u64,
829) -> Result<()> {
830    // The page's compressed size should not exceed the remaining bytes that are
831    // available to read. The page's uncompressed size is the expected size
832    // after decompression, which can never be negative.
833    if compressed_size < 0 || compressed_size as u64 > remaining_bytes || uncompressed_size < 0 {
834        return Err(eof_err!("Invalid page header"));
835    }
836    Ok(())
837}
838
839impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
840    fn get_next_page(&mut self) -> Result<Option<Page>> {
841        loop {
842            let page = match &mut self.state {
843                SerializedPageReaderState::Values {
844                    offset,
845                    remaining_bytes: remaining,
846                    next_page_header,
847                    page_index,
848                    require_dictionary,
849                } => {
850                    if *remaining == 0 {
851                        return Ok(None);
852                    }
853
854                    let mut read = self.reader.get_read(*offset)?;
855                    let header = if let Some(header) = next_page_header.take() {
856                        *header
857                    } else {
858                        let (header_len, header) = Self::read_page_header_len(
859                            &self.context,
860                            &mut read,
861                            *page_index,
862                            *require_dictionary,
863                        )?;
864                        verify_page_header_len(header_len, *remaining)?;
865                        *offset += header_len as u64;
866                        *remaining -= header_len as u64;
867                        header
868                    };
869                    verify_page_size(
870                        header.compressed_page_size,
871                        header.uncompressed_page_size,
872                        *remaining,
873                    )?;
874                    let data_len = header.compressed_page_size as usize;
875                    *offset += data_len as u64;
876                    *remaining -= data_len as u64;
877
878                    if header.type_ == PageType::INDEX_PAGE {
879                        continue;
880                    }
881
882                    let mut buffer = Vec::with_capacity(data_len);
883                    let read = read.take(data_len as u64).read_to_end(&mut buffer)?;
884
885                    if read != data_len {
886                        return Err(eof_err!(
887                            "Expected to read {} bytes of page, read only {}",
888                            data_len,
889                            read
890                        ));
891                    }
892
893                    let buffer =
894                        self.context
895                            .decrypt_page_data(buffer, *page_index, *require_dictionary)?;
896
897                    let page = decode_page(
898                        header,
899                        Bytes::from(buffer),
900                        self.physical_type,
901                        self.decompressor.as_mut(),
902                    )?;
903                    if page.is_data_page() {
904                        *page_index += 1;
905                    } else if page.is_dictionary_page() {
906                        *require_dictionary = false;
907                    }
908                    page
909                }
910                SerializedPageReaderState::Pages {
911                    page_locations,
912                    dictionary_page,
913                    page_index,
914                    ..
915                } => {
916                    let (front, is_dictionary_page) = match dictionary_page.take() {
917                        Some(front) => (front, true),
918                        None => match page_locations.pop_front() {
919                            Some(front) => (front, false),
920                            None => return Ok(None),
921                        },
922                    };
923
924                    let page_len = usize::try_from(front.compressed_page_size)?;
925                    let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
926
927                    let (offset, header) = Self::read_page_header_len_from_bytes(
928                        &self.context,
929                        buffer.as_ref(),
930                        *page_index,
931                        is_dictionary_page,
932                    )?;
933                    let bytes = buffer.slice(offset..);
934                    let bytes =
935                        self.context
936                            .decrypt_page_data(bytes, *page_index, is_dictionary_page)?;
937
938                    if !is_dictionary_page {
939                        *page_index += 1;
940                    }
941                    decode_page(
942                        header,
943                        bytes,
944                        self.physical_type,
945                        self.decompressor.as_mut(),
946                    )?
947                }
948            };
949
950            return Ok(Some(page));
951        }
952    }
953
954    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
955        match &mut self.state {
956            SerializedPageReaderState::Values {
957                offset,
958                remaining_bytes,
959                next_page_header,
960                page_index,
961                require_dictionary,
962            } => {
963                loop {
964                    if *remaining_bytes == 0 {
965                        return Ok(None);
966                    }
967                    return if let Some(header) = next_page_header.as_ref() {
968                        if let Ok(page_meta) = (&**header).try_into() {
969                            Ok(Some(page_meta))
970                        } else {
971                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
972                            *next_page_header = None;
973                            continue;
974                        }
975                    } else {
976                        let mut read = self.reader.get_read(*offset)?;
977                        let (header_len, header) = Self::read_page_header_len(
978                            &self.context,
979                            &mut read,
980                            *page_index,
981                            *require_dictionary,
982                        )?;
983                        verify_page_header_len(header_len, *remaining_bytes)?;
984                        *offset += header_len as u64;
985                        *remaining_bytes -= header_len as u64;
986                        let page_meta = if let Ok(page_meta) = (&header).try_into() {
987                            Ok(Some(page_meta))
988                        } else {
989                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
990                            continue;
991                        };
992                        *next_page_header = Some(Box::new(header));
993                        page_meta
994                    };
995                }
996            }
997            SerializedPageReaderState::Pages {
998                page_locations,
999                dictionary_page,
1000                total_rows,
1001                page_index: _,
1002            } => {
1003                if dictionary_page.is_some() {
1004                    Ok(Some(PageMetadata {
1005                        num_rows: None,
1006                        num_levels: None,
1007                        is_dict: true,
1008                    }))
1009                } else if let Some(page) = page_locations.front() {
1010                    let next_rows = page_locations
1011                        .get(1)
1012                        .map(|x| x.first_row_index as usize)
1013                        .unwrap_or(*total_rows);
1014
1015                    Ok(Some(PageMetadata {
1016                        num_rows: Some(next_rows - page.first_row_index as usize),
1017                        num_levels: None,
1018                        is_dict: false,
1019                    }))
1020                } else {
1021                    Ok(None)
1022                }
1023            }
1024        }
1025    }
1026
1027    fn skip_next_page(&mut self) -> Result<()> {
1028        match &mut self.state {
1029            SerializedPageReaderState::Values {
1030                offset,
1031                remaining_bytes,
1032                next_page_header,
1033                page_index,
1034                require_dictionary,
1035            } => {
1036                if let Some(buffered_header) = next_page_header.take() {
1037                    verify_page_size(
1038                        buffered_header.compressed_page_size,
1039                        buffered_header.uncompressed_page_size,
1040                        *remaining_bytes,
1041                    )?;
1042                    // The next page header has already been peeked, so just advance the offset
1043                    *offset += buffered_header.compressed_page_size as u64;
1044                    *remaining_bytes -= buffered_header.compressed_page_size as u64;
1045                } else {
1046                    let mut read = self.reader.get_read(*offset)?;
1047                    let (header_len, header) = Self::read_page_header_len(
1048                        &self.context,
1049                        &mut read,
1050                        *page_index,
1051                        *require_dictionary,
1052                    )?;
1053                    verify_page_header_len(header_len, *remaining_bytes)?;
1054                    verify_page_size(
1055                        header.compressed_page_size,
1056                        header.uncompressed_page_size,
1057                        *remaining_bytes,
1058                    )?;
1059                    let data_page_size = header.compressed_page_size as u64;
1060                    *offset += header_len as u64 + data_page_size;
1061                    *remaining_bytes -= header_len as u64 + data_page_size;
1062                }
1063                if *require_dictionary {
1064                    *require_dictionary = false;
1065                } else {
1066                    *page_index += 1;
1067                }
1068                Ok(())
1069            }
1070            SerializedPageReaderState::Pages {
1071                page_locations,
1072                dictionary_page,
1073                page_index,
1074                ..
1075            } => {
1076                if dictionary_page.is_some() {
1077                    // If a dictionary page exists, consume it by taking it (sets to None)
1078                    dictionary_page.take();
1079                } else {
1080                    // If no dictionary page exists, simply pop the data page from page_locations
1081                    if page_locations.pop_front().is_some() {
1082                        *page_index += 1;
1083                    }
1084                }
1085
1086                Ok(())
1087            }
1088        }
1089    }
1090
1091    fn at_record_boundary(&mut self) -> Result<bool> {
1092        match &mut self.state {
1093            SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
1094            SerializedPageReaderState::Pages { .. } => Ok(true),
1095        }
1096    }
1097}
1098
1099#[cfg(test)]
1100mod tests {
1101    use std::collections::HashSet;
1102
1103    use bytes::Buf;
1104
1105    use crate::file::properties::{EnabledStatistics, WriterProperties};
1106    use crate::format::BoundaryOrder;
1107
1108    use crate::basic::{self, ColumnOrder, SortOrder};
1109    use crate::column::reader::ColumnReader;
1110    use crate::data_type::private::ParquetValueType;
1111    use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1112    use crate::file::page_index::index::{Index, NativeIndex};
1113    #[allow(deprecated)]
1114    use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
1115    use crate::file::writer::SerializedFileWriter;
1116    use crate::record::RowAccessor;
1117    use crate::schema::parser::parse_message_type;
1118    use crate::util::test_common::file_util::{get_test_file, get_test_path};
1119
1120    use super::*;
1121
1122    #[test]
1123    fn test_cursor_and_file_has_the_same_behaviour() {
1124        let mut buf: Vec<u8> = Vec::new();
1125        get_test_file("alltypes_plain.parquet")
1126            .read_to_end(&mut buf)
1127            .unwrap();
1128        let cursor = Bytes::from(buf);
1129        let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1130
1131        let test_file = get_test_file("alltypes_plain.parquet");
1132        let read_from_file = SerializedFileReader::new(test_file).unwrap();
1133
1134        let file_iter = read_from_file.get_row_iter(None).unwrap();
1135        let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1136
1137        for (a, b) in file_iter.zip(cursor_iter) {
1138            assert_eq!(a.unwrap(), b.unwrap())
1139        }
1140    }
1141
1142    #[test]
1143    fn test_file_reader_try_from() {
1144        // Valid file path
1145        let test_file = get_test_file("alltypes_plain.parquet");
1146        let test_path_buf = get_test_path("alltypes_plain.parquet");
1147        let test_path = test_path_buf.as_path();
1148        let test_path_str = test_path.to_str().unwrap();
1149
1150        let reader = SerializedFileReader::try_from(test_file);
1151        assert!(reader.is_ok());
1152
1153        let reader = SerializedFileReader::try_from(test_path);
1154        assert!(reader.is_ok());
1155
1156        let reader = SerializedFileReader::try_from(test_path_str);
1157        assert!(reader.is_ok());
1158
1159        let reader = SerializedFileReader::try_from(test_path_str.to_string());
1160        assert!(reader.is_ok());
1161
1162        // Invalid file path
1163        let test_path = Path::new("invalid.parquet");
1164        let test_path_str = test_path.to_str().unwrap();
1165
1166        let reader = SerializedFileReader::try_from(test_path);
1167        assert!(reader.is_err());
1168
1169        let reader = SerializedFileReader::try_from(test_path_str);
1170        assert!(reader.is_err());
1171
1172        let reader = SerializedFileReader::try_from(test_path_str.to_string());
1173        assert!(reader.is_err());
1174    }
1175
1176    #[test]
1177    fn test_file_reader_into_iter() {
1178        let path = get_test_path("alltypes_plain.parquet");
1179        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1180        let iter = reader.into_iter();
1181        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1182
1183        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1184    }
1185
1186    #[test]
1187    fn test_file_reader_into_iter_project() {
1188        let path = get_test_path("alltypes_plain.parquet");
1189        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1190        let schema = "message schema { OPTIONAL INT32 id; }";
1191        let proj = parse_message_type(schema).ok();
1192        let iter = reader.into_iter().project(proj).unwrap();
1193        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1194
1195        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1196    }
1197
1198    #[test]
1199    fn test_reuse_file_chunk() {
1200        // This test covers the case of maintaining the correct start position in a file
1201        // stream for each column reader after initializing and moving to the next one
1202        // (without necessarily reading the entire column).
1203        let test_file = get_test_file("alltypes_plain.parquet");
1204        let reader = SerializedFileReader::new(test_file).unwrap();
1205        let row_group = reader.get_row_group(0).unwrap();
1206
1207        let mut page_readers = Vec::new();
1208        for i in 0..row_group.num_columns() {
1209            page_readers.push(row_group.get_column_page_reader(i).unwrap());
1210        }
1211
1212        // Now buffer each col reader, we do not expect any failures like:
1213        // General("underlying Thrift error: end of file")
1214        for mut page_reader in page_readers {
1215            assert!(page_reader.get_next_page().is_ok());
1216        }
1217    }
1218
1219    #[test]
1220    fn test_file_reader() {
1221        let test_file = get_test_file("alltypes_plain.parquet");
1222        let reader_result = SerializedFileReader::new(test_file);
1223        assert!(reader_result.is_ok());
1224        let reader = reader_result.unwrap();
1225
1226        // Test contents in Parquet metadata
1227        let metadata = reader.metadata();
1228        assert_eq!(metadata.num_row_groups(), 1);
1229
1230        // Test contents in file metadata
1231        let file_metadata = metadata.file_metadata();
1232        assert!(file_metadata.created_by().is_some());
1233        assert_eq!(
1234            file_metadata.created_by().unwrap(),
1235            "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1236        );
1237        assert!(file_metadata.key_value_metadata().is_none());
1238        assert_eq!(file_metadata.num_rows(), 8);
1239        assert_eq!(file_metadata.version(), 1);
1240        assert_eq!(file_metadata.column_orders(), None);
1241
1242        // Test contents in row group metadata
1243        let row_group_metadata = metadata.row_group(0);
1244        assert_eq!(row_group_metadata.num_columns(), 11);
1245        assert_eq!(row_group_metadata.num_rows(), 8);
1246        assert_eq!(row_group_metadata.total_byte_size(), 671);
1247        // Check each column order
1248        for i in 0..row_group_metadata.num_columns() {
1249            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1250        }
1251
1252        // Test row group reader
1253        let row_group_reader_result = reader.get_row_group(0);
1254        assert!(row_group_reader_result.is_ok());
1255        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1256        assert_eq!(
1257            row_group_reader.num_columns(),
1258            row_group_metadata.num_columns()
1259        );
1260        assert_eq!(
1261            row_group_reader.metadata().total_byte_size(),
1262            row_group_metadata.total_byte_size()
1263        );
1264
1265        // Test page readers
1266        // TODO: test for every column
1267        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1268        assert!(page_reader_0_result.is_ok());
1269        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1270        let mut page_count = 0;
1271        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1272            let is_expected_page = match page {
1273                Page::DictionaryPage {
1274                    buf,
1275                    num_values,
1276                    encoding,
1277                    is_sorted,
1278                } => {
1279                    assert_eq!(buf.len(), 32);
1280                    assert_eq!(num_values, 8);
1281                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1282                    assert!(!is_sorted);
1283                    true
1284                }
1285                Page::DataPage {
1286                    buf,
1287                    num_values,
1288                    encoding,
1289                    def_level_encoding,
1290                    rep_level_encoding,
1291                    statistics,
1292                } => {
1293                    assert_eq!(buf.len(), 11);
1294                    assert_eq!(num_values, 8);
1295                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1296                    assert_eq!(def_level_encoding, Encoding::RLE);
1297                    #[allow(deprecated)]
1298                    let expected_rep_level_encoding = Encoding::BIT_PACKED;
1299                    assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1300                    assert!(statistics.is_none());
1301                    true
1302                }
1303                _ => false,
1304            };
1305            assert!(is_expected_page);
1306            page_count += 1;
1307        }
1308        assert_eq!(page_count, 2);
1309    }
1310
1311    #[test]
1312    fn test_file_reader_datapage_v2() {
1313        let test_file = get_test_file("datapage_v2.snappy.parquet");
1314        let reader_result = SerializedFileReader::new(test_file);
1315        assert!(reader_result.is_ok());
1316        let reader = reader_result.unwrap();
1317
1318        // Test contents in Parquet metadata
1319        let metadata = reader.metadata();
1320        assert_eq!(metadata.num_row_groups(), 1);
1321
1322        // Test contents in file metadata
1323        let file_metadata = metadata.file_metadata();
1324        assert!(file_metadata.created_by().is_some());
1325        assert_eq!(
1326            file_metadata.created_by().unwrap(),
1327            "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1328        );
1329        assert!(file_metadata.key_value_metadata().is_some());
1330        assert_eq!(
1331            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1332            1
1333        );
1334
1335        assert_eq!(file_metadata.num_rows(), 5);
1336        assert_eq!(file_metadata.version(), 1);
1337        assert_eq!(file_metadata.column_orders(), None);
1338
1339        let row_group_metadata = metadata.row_group(0);
1340
1341        // Check each column order
1342        for i in 0..row_group_metadata.num_columns() {
1343            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1344        }
1345
1346        // Test row group reader
1347        let row_group_reader_result = reader.get_row_group(0);
1348        assert!(row_group_reader_result.is_ok());
1349        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1350        assert_eq!(
1351            row_group_reader.num_columns(),
1352            row_group_metadata.num_columns()
1353        );
1354        assert_eq!(
1355            row_group_reader.metadata().total_byte_size(),
1356            row_group_metadata.total_byte_size()
1357        );
1358
1359        // Test page readers
1360        // TODO: test for every column
1361        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1362        assert!(page_reader_0_result.is_ok());
1363        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1364        let mut page_count = 0;
1365        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1366            let is_expected_page = match page {
1367                Page::DictionaryPage {
1368                    buf,
1369                    num_values,
1370                    encoding,
1371                    is_sorted,
1372                } => {
1373                    assert_eq!(buf.len(), 7);
1374                    assert_eq!(num_values, 1);
1375                    assert_eq!(encoding, Encoding::PLAIN);
1376                    assert!(!is_sorted);
1377                    true
1378                }
1379                Page::DataPageV2 {
1380                    buf,
1381                    num_values,
1382                    encoding,
1383                    num_nulls,
1384                    num_rows,
1385                    def_levels_byte_len,
1386                    rep_levels_byte_len,
1387                    is_compressed,
1388                    statistics,
1389                } => {
1390                    assert_eq!(buf.len(), 4);
1391                    assert_eq!(num_values, 5);
1392                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1393                    assert_eq!(num_nulls, 1);
1394                    assert_eq!(num_rows, 5);
1395                    assert_eq!(def_levels_byte_len, 2);
1396                    assert_eq!(rep_levels_byte_len, 0);
1397                    assert!(is_compressed);
1398                    assert!(statistics.is_some());
1399                    true
1400                }
1401                _ => false,
1402            };
1403            assert!(is_expected_page);
1404            page_count += 1;
1405        }
1406        assert_eq!(page_count, 2);
1407    }
1408
1409    #[test]
1410    fn test_file_reader_empty_compressed_datapage_v2() {
1411        // this file has a compressed datapage that un-compresses to 0 bytes
1412        let test_file = get_test_file("page_v2_empty_compressed.parquet");
1413        let reader_result = SerializedFileReader::new(test_file);
1414        assert!(reader_result.is_ok());
1415        let reader = reader_result.unwrap();
1416
1417        // Test contents in Parquet metadata
1418        let metadata = reader.metadata();
1419        assert_eq!(metadata.num_row_groups(), 1);
1420
1421        // Test contents in file metadata
1422        let file_metadata = metadata.file_metadata();
1423        assert!(file_metadata.created_by().is_some());
1424        assert_eq!(
1425            file_metadata.created_by().unwrap(),
1426            "parquet-cpp-arrow version 14.0.2"
1427        );
1428        assert!(file_metadata.key_value_metadata().is_some());
1429        assert_eq!(
1430            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1431            1
1432        );
1433
1434        assert_eq!(file_metadata.num_rows(), 10);
1435        assert_eq!(file_metadata.version(), 2);
1436        let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1437        assert_eq!(
1438            file_metadata.column_orders(),
1439            Some(vec![expected_order].as_ref())
1440        );
1441
1442        let row_group_metadata = metadata.row_group(0);
1443
1444        // Check each column order
1445        for i in 0..row_group_metadata.num_columns() {
1446            assert_eq!(file_metadata.column_order(i), expected_order);
1447        }
1448
1449        // Test row group reader
1450        let row_group_reader_result = reader.get_row_group(0);
1451        assert!(row_group_reader_result.is_ok());
1452        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1453        assert_eq!(
1454            row_group_reader.num_columns(),
1455            row_group_metadata.num_columns()
1456        );
1457        assert_eq!(
1458            row_group_reader.metadata().total_byte_size(),
1459            row_group_metadata.total_byte_size()
1460        );
1461
1462        // Test page readers
1463        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1464        assert!(page_reader_0_result.is_ok());
1465        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1466        let mut page_count = 0;
1467        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1468            let is_expected_page = match page {
1469                Page::DictionaryPage {
1470                    buf,
1471                    num_values,
1472                    encoding,
1473                    is_sorted,
1474                } => {
1475                    assert_eq!(buf.len(), 0);
1476                    assert_eq!(num_values, 0);
1477                    assert_eq!(encoding, Encoding::PLAIN);
1478                    assert!(!is_sorted);
1479                    true
1480                }
1481                Page::DataPageV2 {
1482                    buf,
1483                    num_values,
1484                    encoding,
1485                    num_nulls,
1486                    num_rows,
1487                    def_levels_byte_len,
1488                    rep_levels_byte_len,
1489                    is_compressed,
1490                    statistics,
1491                } => {
1492                    assert_eq!(buf.len(), 3);
1493                    assert_eq!(num_values, 10);
1494                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1495                    assert_eq!(num_nulls, 10);
1496                    assert_eq!(num_rows, 10);
1497                    assert_eq!(def_levels_byte_len, 2);
1498                    assert_eq!(rep_levels_byte_len, 0);
1499                    assert!(is_compressed);
1500                    assert!(statistics.is_some());
1501                    true
1502                }
1503                _ => false,
1504            };
1505            assert!(is_expected_page);
1506            page_count += 1;
1507        }
1508        assert_eq!(page_count, 2);
1509    }
1510
1511    #[test]
1512    fn test_file_reader_empty_datapage_v2() {
1513        // this file has 0 bytes compressed datapage that un-compresses to 0 bytes
1514        let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1515        let reader_result = SerializedFileReader::new(test_file);
1516        assert!(reader_result.is_ok());
1517        let reader = reader_result.unwrap();
1518
1519        // Test contents in Parquet metadata
1520        let metadata = reader.metadata();
1521        assert_eq!(metadata.num_row_groups(), 1);
1522
1523        // Test contents in file metadata
1524        let file_metadata = metadata.file_metadata();
1525        assert!(file_metadata.created_by().is_some());
1526        assert_eq!(
1527            file_metadata.created_by().unwrap(),
1528            "parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1529        );
1530        assert!(file_metadata.key_value_metadata().is_some());
1531        assert_eq!(
1532            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1533            2
1534        );
1535
1536        assert_eq!(file_metadata.num_rows(), 1);
1537        assert_eq!(file_metadata.version(), 1);
1538        let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1539        assert_eq!(
1540            file_metadata.column_orders(),
1541            Some(vec![expected_order].as_ref())
1542        );
1543
1544        let row_group_metadata = metadata.row_group(0);
1545
1546        // Check each column order
1547        for i in 0..row_group_metadata.num_columns() {
1548            assert_eq!(file_metadata.column_order(i), expected_order);
1549        }
1550
1551        // Test row group reader
1552        let row_group_reader_result = reader.get_row_group(0);
1553        assert!(row_group_reader_result.is_ok());
1554        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1555        assert_eq!(
1556            row_group_reader.num_columns(),
1557            row_group_metadata.num_columns()
1558        );
1559        assert_eq!(
1560            row_group_reader.metadata().total_byte_size(),
1561            row_group_metadata.total_byte_size()
1562        );
1563
1564        // Test page readers
1565        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1566        assert!(page_reader_0_result.is_ok());
1567        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1568        let mut page_count = 0;
1569        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1570            let is_expected_page = match page {
1571                Page::DataPageV2 {
1572                    buf,
1573                    num_values,
1574                    encoding,
1575                    num_nulls,
1576                    num_rows,
1577                    def_levels_byte_len,
1578                    rep_levels_byte_len,
1579                    is_compressed,
1580                    statistics,
1581                } => {
1582                    assert_eq!(buf.len(), 2);
1583                    assert_eq!(num_values, 1);
1584                    assert_eq!(encoding, Encoding::PLAIN);
1585                    assert_eq!(num_nulls, 1);
1586                    assert_eq!(num_rows, 1);
1587                    assert_eq!(def_levels_byte_len, 2);
1588                    assert_eq!(rep_levels_byte_len, 0);
1589                    assert!(is_compressed);
1590                    assert!(statistics.is_none());
1591                    true
1592                }
1593                _ => false,
1594            };
1595            assert!(is_expected_page);
1596            page_count += 1;
1597        }
1598        assert_eq!(page_count, 1);
1599    }
1600
1601    fn get_serialized_page_reader<R: ChunkReader>(
1602        file_reader: &SerializedFileReader<R>,
1603        row_group: usize,
1604        column: usize,
1605    ) -> Result<SerializedPageReader<R>> {
1606        let row_group = {
1607            let row_group_metadata = file_reader.metadata.row_group(row_group);
1608            let props = Arc::clone(&file_reader.props);
1609            let f = Arc::clone(&file_reader.chunk_reader);
1610            SerializedRowGroupReader::new(
1611                f,
1612                row_group_metadata,
1613                file_reader
1614                    .metadata
1615                    .offset_index()
1616                    .map(|x| x[row_group].as_slice()),
1617                props,
1618            )?
1619        };
1620
1621        let col = row_group.metadata.column(column);
1622
1623        let page_locations = row_group
1624            .offset_index
1625            .map(|x| x[column].page_locations.clone());
1626
1627        let props = Arc::clone(&row_group.props);
1628        SerializedPageReader::new_with_properties(
1629            Arc::clone(&row_group.chunk_reader),
1630            col,
1631            usize::try_from(row_group.metadata.num_rows())?,
1632            page_locations,
1633            props,
1634        )
1635    }
1636
1637    #[test]
1638    fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1639        let test_file = get_test_file("alltypes_plain.parquet");
1640        let reader = SerializedFileReader::new(test_file)?;
1641
1642        let mut offset_set = HashSet::new();
1643        let num_row_groups = reader.metadata.num_row_groups();
1644        for row_group in 0..num_row_groups {
1645            let num_columns = reader.metadata.row_group(row_group).num_columns();
1646            for column in 0..num_columns {
1647                let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1648
1649                while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1650                    match &page_reader.state {
1651                        SerializedPageReaderState::Pages {
1652                            page_locations,
1653                            dictionary_page,
1654                            ..
1655                        } => {
1656                            if let Some(page) = dictionary_page {
1657                                assert_eq!(page.offset as u64, page_offset);
1658                            } else if let Some(page) = page_locations.front() {
1659                                assert_eq!(page.offset as u64, page_offset);
1660                            } else {
1661                                unreachable!()
1662                            }
1663                        }
1664                        SerializedPageReaderState::Values {
1665                            offset,
1666                            next_page_header,
1667                            ..
1668                        } => {
1669                            assert!(next_page_header.is_some());
1670                            assert_eq!(*offset, page_offset);
1671                        }
1672                    }
1673                    let page = page_reader.get_next_page()?;
1674                    assert!(page.is_some());
1675                    let newly_inserted = offset_set.insert(page_offset);
1676                    assert!(newly_inserted);
1677                }
1678            }
1679        }
1680
1681        Ok(())
1682    }
1683
1684    #[test]
1685    fn test_page_iterator() {
1686        let file = get_test_file("alltypes_plain.parquet");
1687        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1688
1689        let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1690
1691        // read first page
1692        let page = page_iterator.next();
1693        assert!(page.is_some());
1694        assert!(page.unwrap().is_ok());
1695
1696        // reach end of file
1697        let page = page_iterator.next();
1698        assert!(page.is_none());
1699
1700        let row_group_indices = Box::new(0..1);
1701        let mut page_iterator =
1702            FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1703
1704        // read first page
1705        let page = page_iterator.next();
1706        assert!(page.is_some());
1707        assert!(page.unwrap().is_ok());
1708
1709        // reach end of file
1710        let page = page_iterator.next();
1711        assert!(page.is_none());
1712    }
1713
1714    #[test]
1715    fn test_file_reader_key_value_metadata() {
1716        let file = get_test_file("binary.parquet");
1717        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1718
1719        let metadata = file_reader
1720            .metadata
1721            .file_metadata()
1722            .key_value_metadata()
1723            .unwrap();
1724
1725        assert_eq!(metadata.len(), 3);
1726
1727        assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1728
1729        assert_eq!(metadata[1].key, "writer.model.name");
1730        assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1731
1732        assert_eq!(metadata[2].key, "parquet.proto.class");
1733        assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1734    }
1735
1736    #[test]
1737    fn test_file_reader_optional_metadata() {
1738        // file with optional metadata: bloom filters, encoding stats, column index and offset index.
1739        let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1740        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1741
1742        let row_group_metadata = file_reader.metadata.row_group(0);
1743        let col0_metadata = row_group_metadata.column(0);
1744
1745        // test optional bloom filter offset
1746        assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1747
1748        // test page encoding stats
1749        let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1750
1751        assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1752        assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1753        assert_eq!(page_encoding_stats.count, 1);
1754
1755        // test optional column index offset
1756        assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1757        assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1758
1759        // test optional offset index offset
1760        assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1761        assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1762    }
1763
1764    #[test]
1765    fn test_file_reader_with_no_filter() -> Result<()> {
1766        let test_file = get_test_file("alltypes_plain.parquet");
1767        let origin_reader = SerializedFileReader::new(test_file)?;
1768        // test initial number of row groups
1769        let metadata = origin_reader.metadata();
1770        assert_eq!(metadata.num_row_groups(), 1);
1771        Ok(())
1772    }
1773
1774    #[test]
1775    fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
1776        let test_file = get_test_file("alltypes_plain.parquet");
1777        let read_options = ReadOptionsBuilder::new()
1778            .with_predicate(Box::new(|_, _| false))
1779            .build();
1780        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1781        let metadata = reader.metadata();
1782        assert_eq!(metadata.num_row_groups(), 0);
1783        Ok(())
1784    }
1785
1786    #[test]
1787    fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
1788        let test_file = get_test_file("alltypes_plain.parquet");
1789        let origin_reader = SerializedFileReader::new(test_file)?;
1790        // test initial number of row groups
1791        let metadata = origin_reader.metadata();
1792        assert_eq!(metadata.num_row_groups(), 1);
1793        let mid = get_midpoint_offset(metadata.row_group(0));
1794
1795        let test_file = get_test_file("alltypes_plain.parquet");
1796        let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
1797        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1798        let metadata = reader.metadata();
1799        assert_eq!(metadata.num_row_groups(), 1);
1800
1801        let test_file = get_test_file("alltypes_plain.parquet");
1802        let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
1803        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1804        let metadata = reader.metadata();
1805        assert_eq!(metadata.num_row_groups(), 0);
1806        Ok(())
1807    }
1808
1809    #[test]
1810    fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
1811        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1812        let origin_reader = SerializedFileReader::new(test_file)?;
1813        let metadata = origin_reader.metadata();
1814        let mid = get_midpoint_offset(metadata.row_group(0));
1815
1816        // true, true predicate
1817        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1818        let read_options = ReadOptionsBuilder::new()
1819            .with_page_index()
1820            .with_predicate(Box::new(|_, _| true))
1821            .with_range(mid, mid + 1)
1822            .build();
1823        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1824        let metadata = reader.metadata();
1825        assert_eq!(metadata.num_row_groups(), 1);
1826        assert_eq!(metadata.column_index().unwrap().len(), 1);
1827        assert_eq!(metadata.offset_index().unwrap().len(), 1);
1828
1829        // true, false predicate
1830        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1831        let read_options = ReadOptionsBuilder::new()
1832            .with_page_index()
1833            .with_predicate(Box::new(|_, _| true))
1834            .with_range(0, mid)
1835            .build();
1836        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1837        let metadata = reader.metadata();
1838        assert_eq!(metadata.num_row_groups(), 0);
1839        assert!(metadata.column_index().is_none());
1840        assert!(metadata.offset_index().is_none());
1841
1842        // false, true predicate
1843        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1844        let read_options = ReadOptionsBuilder::new()
1845            .with_page_index()
1846            .with_predicate(Box::new(|_, _| false))
1847            .with_range(mid, mid + 1)
1848            .build();
1849        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1850        let metadata = reader.metadata();
1851        assert_eq!(metadata.num_row_groups(), 0);
1852        assert!(metadata.column_index().is_none());
1853        assert!(metadata.offset_index().is_none());
1854
1855        // false, false predicate
1856        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1857        let read_options = ReadOptionsBuilder::new()
1858            .with_page_index()
1859            .with_predicate(Box::new(|_, _| false))
1860            .with_range(0, mid)
1861            .build();
1862        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1863        let metadata = reader.metadata();
1864        assert_eq!(metadata.num_row_groups(), 0);
1865        assert!(metadata.column_index().is_none());
1866        assert!(metadata.offset_index().is_none());
1867        Ok(())
1868    }
1869
1870    #[test]
1871    fn test_file_reader_invalid_metadata() {
1872        let data = [
1873            255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
1874            80, 65, 82, 49,
1875        ];
1876        let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
1877        assert_eq!(
1878            ret.err().unwrap().to_string(),
1879            "Parquet error: Could not parse metadata: bad data"
1880        );
1881    }
1882
1883    #[test]
1884    // Use java parquet-tools get below pageIndex info
1885    // !```
1886    // parquet-tools column-index ./data_index_bloom_encoding_stats.parquet
1887    // row group 0:
1888    // column index for column String:
1889    // Boundary order: ASCENDING
1890    // page-0  :
1891    // null count                 min                                  max
1892    // 0                          Hello                                today
1893    //
1894    // offset index for column String:
1895    // page-0   :
1896    // offset   compressed size       first row index
1897    // 4               152                     0
1898    ///```
1899    //
1900    fn test_page_index_reader() {
1901        let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
1902        let builder = ReadOptionsBuilder::new();
1903        //enable read page index
1904        let options = builder.with_page_index().build();
1905        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1906        let reader = reader_result.unwrap();
1907
1908        // Test contents in Parquet metadata
1909        let metadata = reader.metadata();
1910        assert_eq!(metadata.num_row_groups(), 1);
1911
1912        let column_index = metadata.column_index().unwrap();
1913
1914        // only one row group
1915        assert_eq!(column_index.len(), 1);
1916        let index = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
1917            index
1918        } else {
1919            unreachable!()
1920        };
1921
1922        assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
1923        let index_in_pages = &index.indexes;
1924
1925        //only one page group
1926        assert_eq!(index_in_pages.len(), 1);
1927
1928        let page0 = &index_in_pages[0];
1929        let min = page0.min.as_ref().unwrap();
1930        let max = page0.max.as_ref().unwrap();
1931        assert_eq!(b"Hello", min.as_bytes());
1932        assert_eq!(b"today", max.as_bytes());
1933
1934        let offset_indexes = metadata.offset_index().unwrap();
1935        // only one row group
1936        assert_eq!(offset_indexes.len(), 1);
1937        let offset_index = &offset_indexes[0];
1938        let page_offset = &offset_index[0].page_locations()[0];
1939
1940        assert_eq!(4, page_offset.offset);
1941        assert_eq!(152, page_offset.compressed_page_size);
1942        assert_eq!(0, page_offset.first_row_index);
1943    }
1944
1945    #[test]
1946    #[allow(deprecated)]
1947    fn test_page_index_reader_out_of_order() {
1948        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1949        let options = ReadOptionsBuilder::new().with_page_index().build();
1950        let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
1951        let metadata = reader.metadata();
1952
1953        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1954        let columns = metadata.row_group(0).columns();
1955        let reversed: Vec<_> = columns.iter().cloned().rev().collect();
1956
1957        let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
1958        let mut b = read_columns_indexes(&test_file, &reversed)
1959            .unwrap()
1960            .unwrap();
1961        b.reverse();
1962        assert_eq!(a, b);
1963
1964        let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
1965        let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
1966        b.reverse();
1967        assert_eq!(a, b);
1968    }
1969
1970    #[test]
1971    fn test_page_index_reader_all_type() {
1972        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1973        let builder = ReadOptionsBuilder::new();
1974        //enable read page index
1975        let options = builder.with_page_index().build();
1976        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1977        let reader = reader_result.unwrap();
1978
1979        // Test contents in Parquet metadata
1980        let metadata = reader.metadata();
1981        assert_eq!(metadata.num_row_groups(), 1);
1982
1983        let column_index = metadata.column_index().unwrap();
1984        let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
1985
1986        // only one row group
1987        assert_eq!(column_index.len(), 1);
1988        let row_group_metadata = metadata.row_group(0);
1989
1990        //col0->id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 7299, num_nulls: 0]
1991        assert!(!&column_index[0][0].is_sorted());
1992        let boundary_order = &column_index[0][0].get_boundary_order();
1993        assert!(boundary_order.is_some());
1994        matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
1995        if let Index::INT32(index) = &column_index[0][0] {
1996            check_native_page_index(
1997                index,
1998                325,
1999                get_row_group_min_max_bytes(row_group_metadata, 0),
2000                BoundaryOrder::UNORDERED,
2001            );
2002            assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
2003        } else {
2004            unreachable!()
2005        };
2006        //col1->bool_col:BOOLEAN UNCOMPRESSED DO:0 FPO:37329 SZ:3022/3022/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: false, max: true, num_nulls: 0]
2007        assert!(&column_index[0][1].is_sorted());
2008        if let Index::BOOLEAN(index) = &column_index[0][1] {
2009            assert_eq!(index.indexes.len(), 82);
2010            assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
2011        } else {
2012            unreachable!()
2013        };
2014        //col2->tinyint_col: INT32 UNCOMPRESSED DO:0 FPO:40351 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2015        assert!(&column_index[0][2].is_sorted());
2016        if let Index::INT32(index) = &column_index[0][2] {
2017            check_native_page_index(
2018                index,
2019                325,
2020                get_row_group_min_max_bytes(row_group_metadata, 2),
2021                BoundaryOrder::ASCENDING,
2022            );
2023            assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
2024        } else {
2025            unreachable!()
2026        };
2027        //col4->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2028        assert!(&column_index[0][3].is_sorted());
2029        if let Index::INT32(index) = &column_index[0][3] {
2030            check_native_page_index(
2031                index,
2032                325,
2033                get_row_group_min_max_bytes(row_group_metadata, 3),
2034                BoundaryOrder::ASCENDING,
2035            );
2036            assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
2037        } else {
2038            unreachable!()
2039        };
2040        //col5->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2041        assert!(&column_index[0][4].is_sorted());
2042        if let Index::INT32(index) = &column_index[0][4] {
2043            check_native_page_index(
2044                index,
2045                325,
2046                get_row_group_min_max_bytes(row_group_metadata, 4),
2047                BoundaryOrder::ASCENDING,
2048            );
2049            assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
2050        } else {
2051            unreachable!()
2052        };
2053        //col6->bigint_col: INT64 UNCOMPRESSED DO:0 FPO:152326 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 90, num_nulls: 0]
2054        assert!(!&column_index[0][5].is_sorted());
2055        if let Index::INT64(index) = &column_index[0][5] {
2056            check_native_page_index(
2057                index,
2058                528,
2059                get_row_group_min_max_bytes(row_group_metadata, 5),
2060                BoundaryOrder::UNORDERED,
2061            );
2062            assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
2063        } else {
2064            unreachable!()
2065        };
2066        //col7->float_col: FLOAT UNCOMPRESSED DO:0 FPO:223924 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 9.9, num_nulls: 0]
2067        assert!(&column_index[0][6].is_sorted());
2068        if let Index::FLOAT(index) = &column_index[0][6] {
2069            check_native_page_index(
2070                index,
2071                325,
2072                get_row_group_min_max_bytes(row_group_metadata, 6),
2073                BoundaryOrder::ASCENDING,
2074            );
2075            assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
2076        } else {
2077            unreachable!()
2078        };
2079        //col8->double_col: DOUBLE UNCOMPRESSED DO:0 FPO:261249 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 90.89999999999999, num_nulls: 0]
2080        assert!(!&column_index[0][7].is_sorted());
2081        if let Index::DOUBLE(index) = &column_index[0][7] {
2082            check_native_page_index(
2083                index,
2084                528,
2085                get_row_group_min_max_bytes(row_group_metadata, 7),
2086                BoundaryOrder::UNORDERED,
2087            );
2088            assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
2089        } else {
2090            unreachable!()
2091        };
2092        //col9->date_string_col: BINARY UNCOMPRESSED DO:0 FPO:332847 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 01/01/09, max: 12/31/10, num_nulls: 0]
2093        assert!(!&column_index[0][8].is_sorted());
2094        if let Index::BYTE_ARRAY(index) = &column_index[0][8] {
2095            check_native_page_index(
2096                index,
2097                974,
2098                get_row_group_min_max_bytes(row_group_metadata, 8),
2099                BoundaryOrder::UNORDERED,
2100            );
2101            assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
2102        } else {
2103            unreachable!()
2104        };
2105        //col10->string_col: BINARY UNCOMPRESSED DO:0 FPO:444795 SZ:45298/45298/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2106        assert!(&column_index[0][9].is_sorted());
2107        if let Index::BYTE_ARRAY(index) = &column_index[0][9] {
2108            check_native_page_index(
2109                index,
2110                352,
2111                get_row_group_min_max_bytes(row_group_metadata, 9),
2112                BoundaryOrder::ASCENDING,
2113            );
2114            assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
2115        } else {
2116            unreachable!()
2117        };
2118        //col11->timestamp_col: INT96 UNCOMPRESSED DO:0 FPO:490093 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[num_nulls: 0, min/max not defined]
2119        //Notice: min_max values for each page for this col not exits.
2120        assert!(!&column_index[0][10].is_sorted());
2121        if let Index::NONE = &column_index[0][10] {
2122            assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
2123        } else {
2124            unreachable!()
2125        };
2126        //col12->year: INT32 UNCOMPRESSED DO:0 FPO:602041 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2009, max: 2010, num_nulls: 0]
2127        assert!(&column_index[0][11].is_sorted());
2128        if let Index::INT32(index) = &column_index[0][11] {
2129            check_native_page_index(
2130                index,
2131                325,
2132                get_row_group_min_max_bytes(row_group_metadata, 11),
2133                BoundaryOrder::ASCENDING,
2134            );
2135            assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
2136        } else {
2137            unreachable!()
2138        };
2139        //col13->month: INT32 UNCOMPRESSED DO:0 FPO:639366 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 12, num_nulls: 0]
2140        assert!(!&column_index[0][12].is_sorted());
2141        if let Index::INT32(index) = &column_index[0][12] {
2142            check_native_page_index(
2143                index,
2144                325,
2145                get_row_group_min_max_bytes(row_group_metadata, 12),
2146                BoundaryOrder::UNORDERED,
2147            );
2148            assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
2149        } else {
2150            unreachable!()
2151        };
2152    }
2153
2154    fn check_native_page_index<T: ParquetValueType>(
2155        row_group_index: &NativeIndex<T>,
2156        page_size: usize,
2157        min_max: (&[u8], &[u8]),
2158        boundary_order: BoundaryOrder,
2159    ) {
2160        assert_eq!(row_group_index.indexes.len(), page_size);
2161        assert_eq!(row_group_index.boundary_order, boundary_order);
2162        row_group_index.indexes.iter().all(|x| {
2163            x.min.as_ref().unwrap() >= &T::try_from_le_slice(min_max.0).unwrap()
2164                && x.max.as_ref().unwrap() <= &T::try_from_le_slice(min_max.1).unwrap()
2165        });
2166    }
2167
2168    fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
2169        let statistics = r.column(col_num).statistics().unwrap();
2170        (
2171            statistics.min_bytes_opt().unwrap_or_default(),
2172            statistics.max_bytes_opt().unwrap_or_default(),
2173        )
2174    }
2175
2176    #[test]
2177    fn test_skip_next_page_with_dictionary_page() {
2178        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2179        let builder = ReadOptionsBuilder::new();
2180        // enable read page index
2181        let options = builder.with_page_index().build();
2182        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2183        let reader = reader_result.unwrap();
2184
2185        let row_group_reader = reader.get_row_group(0).unwrap();
2186
2187        // use 'string_col', Boundary order: UNORDERED, total 352 data pages and 1 dictionary page.
2188        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2189
2190        let mut vec = vec![];
2191
2192        // Step 1: Peek and ensure dictionary page is correctly identified
2193        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2194        assert!(meta.is_dict);
2195
2196        // Step 2: Call skip_next_page to skip the dictionary page
2197        column_page_reader.skip_next_page().unwrap();
2198
2199        // Step 3: Read the next data page after skipping the dictionary page
2200        let page = column_page_reader.get_next_page().unwrap().unwrap();
2201        assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2202
2203        // Step 4: Continue reading remaining data pages and verify correctness
2204        for _i in 0..351 {
2205            // 352 total pages, 1 dictionary page is skipped
2206            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2207            assert!(!meta.is_dict); // Verify no dictionary page here
2208            vec.push(meta);
2209
2210            let page = column_page_reader.get_next_page().unwrap().unwrap();
2211            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2212        }
2213
2214        // Step 5: Check if all pages are read
2215        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2216        assert!(column_page_reader.get_next_page().unwrap().is_none());
2217
2218        // Step 6: Verify the number of data pages read (should be 351 data pages)
2219        assert_eq!(vec.len(), 351);
2220    }
2221
2222    #[test]
2223    fn test_skip_page_with_offset_index() {
2224        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2225        let builder = ReadOptionsBuilder::new();
2226        //enable read page index
2227        let options = builder.with_page_index().build();
2228        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2229        let reader = reader_result.unwrap();
2230
2231        let row_group_reader = reader.get_row_group(0).unwrap();
2232
2233        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
2234        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2235
2236        let mut vec = vec![];
2237
2238        for i in 0..325 {
2239            if i % 2 == 0 {
2240                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2241            } else {
2242                column_page_reader.skip_next_page().unwrap();
2243            }
2244        }
2245        //check read all pages.
2246        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2247        assert!(column_page_reader.get_next_page().unwrap().is_none());
2248
2249        assert_eq!(vec.len(), 163);
2250    }
2251
2252    #[test]
2253    fn test_skip_page_without_offset_index() {
2254        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2255
2256        // use default SerializedFileReader without read offsetIndex
2257        let reader_result = SerializedFileReader::new(test_file);
2258        let reader = reader_result.unwrap();
2259
2260        let row_group_reader = reader.get_row_group(0).unwrap();
2261
2262        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
2263        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2264
2265        let mut vec = vec![];
2266
2267        for i in 0..325 {
2268            if i % 2 == 0 {
2269                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2270            } else {
2271                column_page_reader.peek_next_page().unwrap().unwrap();
2272                column_page_reader.skip_next_page().unwrap();
2273            }
2274        }
2275        //check read all pages.
2276        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2277        assert!(column_page_reader.get_next_page().unwrap().is_none());
2278
2279        assert_eq!(vec.len(), 163);
2280    }
2281
2282    #[test]
2283    fn test_peek_page_with_dictionary_page() {
2284        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2285        let builder = ReadOptionsBuilder::new();
2286        //enable read page index
2287        let options = builder.with_page_index().build();
2288        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2289        let reader = reader_result.unwrap();
2290        let row_group_reader = reader.get_row_group(0).unwrap();
2291
2292        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
2293        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2294
2295        let mut vec = vec![];
2296
2297        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2298        assert!(meta.is_dict);
2299        let page = column_page_reader.get_next_page().unwrap().unwrap();
2300        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2301
2302        for i in 0..352 {
2303            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2304            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
2305            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
2306            if i != 351 {
2307                assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
2308            } else {
2309                // last page first row index is 7290, total row count is 7300
2310                // because first row start with zero, last page row count should be 10.
2311                assert_eq!(meta.num_rows, Some(10));
2312            }
2313            assert!(!meta.is_dict);
2314            vec.push(meta);
2315            let page = column_page_reader.get_next_page().unwrap().unwrap();
2316            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2317        }
2318
2319        //check read all pages.
2320        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2321        assert!(column_page_reader.get_next_page().unwrap().is_none());
2322
2323        assert_eq!(vec.len(), 352);
2324    }
2325
2326    #[test]
2327    fn test_peek_page_with_dictionary_page_without_offset_index() {
2328        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2329
2330        let reader_result = SerializedFileReader::new(test_file);
2331        let reader = reader_result.unwrap();
2332        let row_group_reader = reader.get_row_group(0).unwrap();
2333
2334        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
2335        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2336
2337        let mut vec = vec![];
2338
2339        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2340        assert!(meta.is_dict);
2341        let page = column_page_reader.get_next_page().unwrap().unwrap();
2342        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2343
2344        for i in 0..352 {
2345            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2346            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
2347            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
2348            if i != 351 {
2349                assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2350            } else {
2351                // last page first row index is 7290, total row count is 7300
2352                // because first row start with zero, last page row count should be 10.
2353                assert_eq!(meta.num_levels, Some(10));
2354            }
2355            assert!(!meta.is_dict);
2356            vec.push(meta);
2357            let page = column_page_reader.get_next_page().unwrap().unwrap();
2358            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2359        }
2360
2361        //check read all pages.
2362        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2363        assert!(column_page_reader.get_next_page().unwrap().is_none());
2364
2365        assert_eq!(vec.len(), 352);
2366    }
2367
2368    #[test]
2369    fn test_fixed_length_index() {
2370        let message_type = "
2371        message test_schema {
2372          OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2373        }
2374        ";
2375
2376        let schema = parse_message_type(message_type).unwrap();
2377        let mut out = Vec::with_capacity(1024);
2378        let mut writer =
2379            SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2380
2381        let mut r = writer.next_row_group().unwrap();
2382        let mut c = r.next_column().unwrap().unwrap();
2383        c.typed::<FixedLenByteArrayType>()
2384            .write_batch(
2385                &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2386                Some(&[1, 1, 0, 1]),
2387                None,
2388            )
2389            .unwrap();
2390        c.close().unwrap();
2391        r.close().unwrap();
2392        writer.close().unwrap();
2393
2394        let b = Bytes::from(out);
2395        let options = ReadOptionsBuilder::new().with_page_index().build();
2396        let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2397        let index = reader.metadata().column_index().unwrap();
2398
2399        // 1 row group
2400        assert_eq!(index.len(), 1);
2401        let c = &index[0];
2402        // 1 column
2403        assert_eq!(c.len(), 1);
2404
2405        match &c[0] {
2406            Index::FIXED_LEN_BYTE_ARRAY(v) => {
2407                assert_eq!(v.indexes.len(), 1);
2408                let page_idx = &v.indexes[0];
2409                assert_eq!(page_idx.null_count.unwrap(), 1);
2410                assert_eq!(page_idx.min.as_ref().unwrap().as_ref(), &[0; 11]);
2411                assert_eq!(page_idx.max.as_ref().unwrap().as_ref(), &[5; 11]);
2412            }
2413            _ => unreachable!(),
2414        }
2415    }
2416
2417    #[test]
2418    fn test_multi_gz() {
2419        let file = get_test_file("concatenated_gzip_members.parquet");
2420        let reader = SerializedFileReader::new(file).unwrap();
2421        let row_group_reader = reader.get_row_group(0).unwrap();
2422        match row_group_reader.get_column_reader(0).unwrap() {
2423            ColumnReader::Int64ColumnReader(mut reader) => {
2424                let mut buffer = Vec::with_capacity(1024);
2425                let mut def_levels = Vec::with_capacity(1024);
2426                let (num_records, num_values, num_levels) = reader
2427                    .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2428                    .unwrap();
2429
2430                assert_eq!(num_records, 513);
2431                assert_eq!(num_values, 513);
2432                assert_eq!(num_levels, 513);
2433
2434                let expected: Vec<i64> = (1..514).collect();
2435                assert_eq!(&buffer, &expected);
2436            }
2437            _ => unreachable!(),
2438        }
2439    }
2440
2441    #[test]
2442    fn test_byte_stream_split_extended() {
2443        let path = format!(
2444            "{}/byte_stream_split_extended.gzip.parquet",
2445            arrow::util::test_util::parquet_test_data(),
2446        );
2447        let file = File::open(path).unwrap();
2448        let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2449
2450        // Use full schema as projected schema
2451        let mut iter = reader
2452            .get_row_iter(None)
2453            .expect("Failed to create row iterator");
2454
2455        let mut start = 0;
2456        let end = reader.metadata().file_metadata().num_rows();
2457
2458        let check_row = |row: Result<Row, ParquetError>| {
2459            assert!(row.is_ok());
2460            let r = row.unwrap();
2461            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2462            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2463            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2464            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2465            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2466            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2467            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2468        };
2469
2470        while start < end {
2471            match iter.next() {
2472                Some(row) => check_row(row),
2473                None => break,
2474            };
2475            start += 1;
2476        }
2477    }
2478
2479    #[test]
2480    fn test_filtered_rowgroup_metadata() {
2481        let message_type = "
2482            message test_schema {
2483                REQUIRED INT32 a;
2484            }
2485        ";
2486        let schema = Arc::new(parse_message_type(message_type).unwrap());
2487        let props = Arc::new(
2488            WriterProperties::builder()
2489                .set_statistics_enabled(EnabledStatistics::Page)
2490                .build(),
2491        );
2492        let mut file: File = tempfile::tempfile().unwrap();
2493        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2494        let data = [1, 2, 3, 4, 5];
2495
2496        // write 5 row groups
2497        for idx in 0..5 {
2498            let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2499            let mut row_group_writer = file_writer.next_row_group().unwrap();
2500            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2501                writer
2502                    .typed::<Int32Type>()
2503                    .write_batch(data_i.as_slice(), None, None)
2504                    .unwrap();
2505                writer.close().unwrap();
2506            }
2507            row_group_writer.close().unwrap();
2508            file_writer.flushed_row_groups();
2509        }
2510        let file_metadata = file_writer.close().unwrap();
2511
2512        assert_eq!(file_metadata.num_rows, 25);
2513        assert_eq!(file_metadata.row_groups.len(), 5);
2514
2515        // read only the 3rd row group
2516        let read_options = ReadOptionsBuilder::new()
2517            .with_page_index()
2518            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2519            .build();
2520        let reader =
2521            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2522                .unwrap();
2523        let metadata = reader.metadata();
2524
2525        // check we got the expected row group
2526        assert_eq!(metadata.num_row_groups(), 1);
2527        assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2528
2529        // check we only got the relevant page indexes
2530        assert!(metadata.column_index().is_some());
2531        assert!(metadata.offset_index().is_some());
2532        assert_eq!(metadata.column_index().unwrap().len(), 1);
2533        assert_eq!(metadata.offset_index().unwrap().len(), 1);
2534        let col_idx = metadata.column_index().unwrap();
2535        let off_idx = metadata.offset_index().unwrap();
2536        let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2537        let pg_idx = &col_idx[0][0];
2538        let off_idx_i = &off_idx[0][0];
2539
2540        // test that we got the index matching the row group
2541        match pg_idx {
2542            Index::INT32(int_idx) => {
2543                let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2544                let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2545                assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2546                assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2547            }
2548            _ => panic!("wrong stats type"),
2549        }
2550
2551        // check offset index matches too
2552        assert_eq!(
2553            off_idx_i.page_locations[0].offset,
2554            metadata.row_group(0).column(0).data_page_offset()
2555        );
2556
2557        // read non-contiguous row groups
2558        let read_options = ReadOptionsBuilder::new()
2559            .with_page_index()
2560            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2561            .build();
2562        let reader =
2563            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2564                .unwrap();
2565        let metadata = reader.metadata();
2566
2567        // check we got the expected row groups
2568        assert_eq!(metadata.num_row_groups(), 2);
2569        assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2570        assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2571
2572        // check we only got the relevant page indexes
2573        assert!(metadata.column_index().is_some());
2574        assert!(metadata.offset_index().is_some());
2575        assert_eq!(metadata.column_index().unwrap().len(), 2);
2576        assert_eq!(metadata.offset_index().unwrap().len(), 2);
2577        let col_idx = metadata.column_index().unwrap();
2578        let off_idx = metadata.offset_index().unwrap();
2579
2580        for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2581            let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2582            let pg_idx = &col_idx_i[0];
2583            let off_idx_i = &off_idx[i][0];
2584
2585            // test that we got the index matching the row group
2586            match pg_idx {
2587                Index::INT32(int_idx) => {
2588                    let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2589                    let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2590                    assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2591                    assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2592                }
2593                _ => panic!("wrong stats type"),
2594            }
2595
2596            // check offset index matches too
2597            assert_eq!(
2598                off_idx_i.page_locations[0].offset,
2599                metadata.row_group(i).column(0).data_page_offset()
2600            );
2601        }
2602    }
2603}