parquet/file/
serialized_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader
19//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM)
20
21use crate::basic::{Encoding, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{create_codec, Codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{read_and_decrypt, CryptoContext};
27use crate::errors::{ParquetError, Result};
28use crate::file::page_index::offset_index::OffsetIndexMetaData;
29use crate::file::{
30    metadata::*,
31    properties::{ReaderProperties, ReaderPropertiesPtr},
32    reader::*,
33    statistics,
34};
35use crate::format::{PageHeader, PageLocation, PageType};
36use crate::record::reader::RowIter;
37use crate::record::Row;
38use crate::schema::types::Type as SchemaType;
39#[cfg(feature = "encryption")]
40use crate::thrift::TCompactSliceInputProtocol;
41use crate::thrift::TSerializable;
42use bytes::Bytes;
43use std::collections::VecDeque;
44use std::iter;
45use std::{fs::File, io::Read, path::Path, sync::Arc};
46use thrift::protocol::TCompactInputProtocol;
47
48impl TryFrom<File> for SerializedFileReader<File> {
49    type Error = ParquetError;
50
51    fn try_from(file: File) -> Result<Self> {
52        Self::new(file)
53    }
54}
55
56impl TryFrom<&Path> for SerializedFileReader<File> {
57    type Error = ParquetError;
58
59    fn try_from(path: &Path) -> Result<Self> {
60        let file = File::open(path)?;
61        Self::try_from(file)
62    }
63}
64
65impl TryFrom<String> for SerializedFileReader<File> {
66    type Error = ParquetError;
67
68    fn try_from(path: String) -> Result<Self> {
69        Self::try_from(Path::new(&path))
70    }
71}
72
73impl TryFrom<&str> for SerializedFileReader<File> {
74    type Error = ParquetError;
75
76    fn try_from(path: &str) -> Result<Self> {
77        Self::try_from(Path::new(&path))
78    }
79}
80
81/// Conversion into a [`RowIter`]
82/// using the full file schema over all row groups.
83impl IntoIterator for SerializedFileReader<File> {
84    type Item = Result<Row>;
85    type IntoIter = RowIter<'static>;
86
87    fn into_iter(self) -> Self::IntoIter {
88        RowIter::from_file_into(Box::new(self))
89    }
90}
91
92// ----------------------------------------------------------------------
93// Implementations of file & row group readers
94
95/// A serialized implementation for Parquet [`FileReader`].
96pub struct SerializedFileReader<R: ChunkReader> {
97    chunk_reader: Arc<R>,
98    metadata: Arc<ParquetMetaData>,
99    props: ReaderPropertiesPtr,
100}
101
102/// A predicate for filtering row groups, invoked with the metadata and index
103/// of each row group in the file. Only row groups for which the predicate
104/// evaluates to `true` will be scanned
105pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
106
107/// A builder for [`ReadOptions`].
108/// For the predicates that are added to the builder,
109/// they will be chained using 'AND' to filter the row groups.
110#[derive(Default)]
111pub struct ReadOptionsBuilder {
112    predicates: Vec<ReadGroupPredicate>,
113    enable_page_index: bool,
114    props: Option<ReaderProperties>,
115}
116
117impl ReadOptionsBuilder {
118    /// New builder
119    pub fn new() -> Self {
120        Self::default()
121    }
122
123    /// Add a predicate on row group metadata to the reading option,
124    /// Filter only row groups that match the predicate criteria
125    pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
126        self.predicates.push(predicate);
127        self
128    }
129
130    /// Add a range predicate on filtering row groups if their midpoints are within
131    /// the Closed-Open range `[start..end) {x | start <= x < end}`
132    pub fn with_range(mut self, start: i64, end: i64) -> Self {
133        assert!(start < end);
134        let predicate = move |rg: &RowGroupMetaData, _: usize| {
135            let mid = get_midpoint_offset(rg);
136            mid >= start && mid < end
137        };
138        self.predicates.push(Box::new(predicate));
139        self
140    }
141
142    /// Enable reading the page index structures described in
143    /// "[Column Index] Layout to Support Page Skipping"
144    ///
145    /// [Column Index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
146    pub fn with_page_index(mut self) -> Self {
147        self.enable_page_index = true;
148        self
149    }
150
151    /// Set the [`ReaderProperties`] configuration.
152    pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
153        self.props = Some(properties);
154        self
155    }
156
157    /// Seal the builder and return the read options
158    pub fn build(self) -> ReadOptions {
159        let props = self
160            .props
161            .unwrap_or_else(|| ReaderProperties::builder().build());
162        ReadOptions {
163            predicates: self.predicates,
164            enable_page_index: self.enable_page_index,
165            props,
166        }
167    }
168}
169
170/// A collection of options for reading a Parquet file.
171///
172/// Currently, only predicates on row group metadata are supported.
173/// All predicates will be chained using 'AND' to filter the row groups.
174pub struct ReadOptions {
175    predicates: Vec<ReadGroupPredicate>,
176    enable_page_index: bool,
177    props: ReaderProperties,
178}
179
180impl<R: 'static + ChunkReader> SerializedFileReader<R> {
181    /// Creates file reader from a Parquet file.
182    /// Returns an error if the Parquet file does not exist or is corrupt.
183    pub fn new(chunk_reader: R) -> Result<Self> {
184        let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
185        let props = Arc::new(ReaderProperties::builder().build());
186        Ok(Self {
187            chunk_reader: Arc::new(chunk_reader),
188            metadata: Arc::new(metadata),
189            props,
190        })
191    }
192
193    /// Creates file reader from a Parquet file with read options.
194    /// Returns an error if the Parquet file does not exist or is corrupt.
195    pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
196        let mut metadata_builder = ParquetMetaDataReader::new()
197            .parse_and_finish(&chunk_reader)?
198            .into_builder();
199        let mut predicates = options.predicates;
200
201        // Filter row groups based on the predicates
202        for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
203            let mut keep = true;
204            for predicate in &mut predicates {
205                if !predicate(&rg_meta, i) {
206                    keep = false;
207                    break;
208                }
209            }
210            if keep {
211                metadata_builder = metadata_builder.add_row_group(rg_meta);
212            }
213        }
214
215        let mut metadata = metadata_builder.build();
216
217        // If page indexes are desired, build them with the filtered set of row groups
218        if options.enable_page_index {
219            let mut reader =
220                ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
221            reader.read_page_indexes(&chunk_reader)?;
222            metadata = reader.finish()?;
223        }
224
225        Ok(Self {
226            chunk_reader: Arc::new(chunk_reader),
227            metadata: Arc::new(metadata),
228            props: Arc::new(options.props),
229        })
230    }
231}
232
233/// Get midpoint offset for a row group
234fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
235    let col = meta.column(0);
236    let mut offset = col.data_page_offset();
237    if let Some(dic_offset) = col.dictionary_page_offset() {
238        if offset > dic_offset {
239            offset = dic_offset
240        }
241    };
242    offset + meta.compressed_size() / 2
243}
244
245impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
246    fn metadata(&self) -> &ParquetMetaData {
247        &self.metadata
248    }
249
250    fn num_row_groups(&self) -> usize {
251        self.metadata.num_row_groups()
252    }
253
254    fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
255        let row_group_metadata = self.metadata.row_group(i);
256        // Row groups should be processed sequentially.
257        let props = Arc::clone(&self.props);
258        let f = Arc::clone(&self.chunk_reader);
259        Ok(Box::new(SerializedRowGroupReader::new(
260            f,
261            row_group_metadata,
262            self.metadata.offset_index().map(|x| x[i].as_slice()),
263            props,
264        )?))
265    }
266
267    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
268        RowIter::from_file(projection, self)
269    }
270}
271
272/// A serialized implementation for Parquet [`RowGroupReader`].
273pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
274    chunk_reader: Arc<R>,
275    metadata: &'a RowGroupMetaData,
276    offset_index: Option<&'a [OffsetIndexMetaData]>,
277    props: ReaderPropertiesPtr,
278    bloom_filters: Vec<Option<Sbbf>>,
279}
280
281impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
282    /// Creates new row group reader from a file, row group metadata and custom config.
283    pub fn new(
284        chunk_reader: Arc<R>,
285        metadata: &'a RowGroupMetaData,
286        offset_index: Option<&'a [OffsetIndexMetaData]>,
287        props: ReaderPropertiesPtr,
288    ) -> Result<Self> {
289        let bloom_filters = if props.read_bloom_filter() {
290            metadata
291                .columns()
292                .iter()
293                .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
294                .collect::<Result<Vec<_>>>()?
295        } else {
296            iter::repeat(None).take(metadata.columns().len()).collect()
297        };
298        Ok(Self {
299            chunk_reader,
300            metadata,
301            offset_index,
302            props,
303            bloom_filters,
304        })
305    }
306}
307
308impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
309    fn metadata(&self) -> &RowGroupMetaData {
310        self.metadata
311    }
312
313    fn num_columns(&self) -> usize {
314        self.metadata.num_columns()
315    }
316
317    // TODO: fix PARQUET-816
318    fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
319        let col = self.metadata.column(i);
320
321        let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
322
323        let props = Arc::clone(&self.props);
324        Ok(Box::new(SerializedPageReader::new_with_properties(
325            Arc::clone(&self.chunk_reader),
326            col,
327            usize::try_from(self.metadata.num_rows())?,
328            page_locations,
329            props,
330        )?))
331    }
332
333    /// get bloom filter for the `i`th column
334    fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
335        self.bloom_filters[i].as_ref()
336    }
337
338    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
339        RowIter::from_row_group(projection, self)
340    }
341}
342
343/// Decodes a [`Page`] from the provided `buffer`
344pub(crate) fn decode_page(
345    page_header: PageHeader,
346    buffer: Bytes,
347    physical_type: Type,
348    decompressor: Option<&mut Box<dyn Codec>>,
349) -> Result<Page> {
350    // Verify the 32-bit CRC checksum of the page
351    #[cfg(feature = "crc")]
352    if let Some(expected_crc) = page_header.crc {
353        let crc = crc32fast::hash(&buffer);
354        if crc != expected_crc as u32 {
355            return Err(general_err!("Page CRC checksum mismatch"));
356        }
357    }
358
359    // When processing data page v2, depending on enabled compression for the
360    // page, we should account for uncompressed data ('offset') of
361    // repetition and definition levels.
362    //
363    // We always use 0 offset for other pages other than v2, `true` flag means
364    // that compression will be applied if decompressor is defined
365    let mut offset: usize = 0;
366    let mut can_decompress = true;
367
368    if let Some(ref header_v2) = page_header.data_page_header_v2 {
369        if header_v2.definition_levels_byte_length < 0
370            || header_v2.repetition_levels_byte_length < 0
371            || header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
372                > page_header.uncompressed_page_size
373        {
374            return Err(general_err!(
375                "DataPage v2 header contains implausible values \
376                    for definition_levels_byte_length ({}) \
377                    and repetition_levels_byte_length ({}) \
378                    given DataPage header provides uncompressed_page_size ({})",
379                header_v2.definition_levels_byte_length,
380                header_v2.repetition_levels_byte_length,
381                page_header.uncompressed_page_size
382            ));
383        }
384        offset = usize::try_from(
385            header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
386        )?;
387        // When is_compressed flag is missing the page is considered compressed
388        can_decompress = header_v2.is_compressed.unwrap_or(true);
389    }
390
391    // TODO: page header could be huge because of statistics. We should set a
392    // maximum page header size and abort if that is exceeded.
393    let buffer = match decompressor {
394        Some(decompressor) if can_decompress => {
395            let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
396            let decompressed_size = uncompressed_page_size - offset;
397            let mut decompressed = Vec::with_capacity(uncompressed_page_size);
398            decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
399            if decompressed_size > 0 {
400                let compressed = &buffer.as_ref()[offset..];
401                decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
402            }
403
404            if decompressed.len() != uncompressed_page_size {
405                return Err(general_err!(
406                    "Actual decompressed size doesn't match the expected one ({} vs {})",
407                    decompressed.len(),
408                    uncompressed_page_size
409                ));
410            }
411
412            Bytes::from(decompressed)
413        }
414        _ => buffer,
415    };
416
417    let result = match page_header.type_ {
418        PageType::DICTIONARY_PAGE => {
419            let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
420                ParquetError::General("Missing dictionary page header".to_string())
421            })?;
422            let is_sorted = dict_header.is_sorted.unwrap_or(false);
423            Page::DictionaryPage {
424                buf: buffer,
425                num_values: dict_header.num_values.try_into()?,
426                encoding: Encoding::try_from(dict_header.encoding)?,
427                is_sorted,
428            }
429        }
430        PageType::DATA_PAGE => {
431            let header = page_header
432                .data_page_header
433                .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
434            Page::DataPage {
435                buf: buffer,
436                num_values: header.num_values.try_into()?,
437                encoding: Encoding::try_from(header.encoding)?,
438                def_level_encoding: Encoding::try_from(header.definition_level_encoding)?,
439                rep_level_encoding: Encoding::try_from(header.repetition_level_encoding)?,
440                statistics: statistics::from_thrift(physical_type, header.statistics)?,
441            }
442        }
443        PageType::DATA_PAGE_V2 => {
444            let header = page_header
445                .data_page_header_v2
446                .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
447            let is_compressed = header.is_compressed.unwrap_or(true);
448            Page::DataPageV2 {
449                buf: buffer,
450                num_values: header.num_values.try_into()?,
451                encoding: Encoding::try_from(header.encoding)?,
452                num_nulls: header.num_nulls.try_into()?,
453                num_rows: header.num_rows.try_into()?,
454                def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
455                rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
456                is_compressed,
457                statistics: statistics::from_thrift(physical_type, header.statistics)?,
458            }
459        }
460        _ => {
461            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
462            unimplemented!("Page type {:?} is not supported", page_header.type_)
463        }
464    };
465
466    Ok(result)
467}
468
469enum SerializedPageReaderState {
470    Values {
471        /// The current byte offset in the reader
472        offset: usize,
473
474        /// The length of the chunk in bytes
475        remaining_bytes: usize,
476
477        // If the next page header has already been "peeked", we will cache it and it`s length here
478        next_page_header: Option<Box<PageHeader>>,
479
480        /// The index of the data page within this column chunk
481        page_index: usize,
482
483        /// Whether the next page is expected to be a dictionary page
484        require_dictionary: bool,
485    },
486    Pages {
487        /// Remaining page locations
488        page_locations: VecDeque<PageLocation>,
489        /// Remaining dictionary location if any
490        dictionary_page: Option<PageLocation>,
491        /// The total number of rows in this column chunk
492        total_rows: usize,
493        /// The index of the data page within this column chunk
494        page_index: usize,
495    },
496}
497
498#[derive(Default)]
499struct SerializedPageReaderContext {
500    /// Crypto context carrying objects required for decryption
501    #[cfg(feature = "encryption")]
502    crypto_context: Option<Arc<CryptoContext>>,
503}
504
505/// A serialized implementation for Parquet [`PageReader`].
506pub struct SerializedPageReader<R: ChunkReader> {
507    /// The chunk reader
508    reader: Arc<R>,
509
510    /// The compression codec for this column chunk. Only set for non-PLAIN codec.
511    decompressor: Option<Box<dyn Codec>>,
512
513    /// Column chunk type.
514    physical_type: Type,
515
516    state: SerializedPageReaderState,
517
518    context: SerializedPageReaderContext,
519}
520
521impl<R: ChunkReader> SerializedPageReader<R> {
522    /// Creates a new serialized page reader from a chunk reader and metadata
523    pub fn new(
524        reader: Arc<R>,
525        column_chunk_metadata: &ColumnChunkMetaData,
526        total_rows: usize,
527        page_locations: Option<Vec<PageLocation>>,
528    ) -> Result<Self> {
529        let props = Arc::new(ReaderProperties::builder().build());
530        SerializedPageReader::new_with_properties(
531            reader,
532            column_chunk_metadata,
533            total_rows,
534            page_locations,
535            props,
536        )
537    }
538
539    /// Stub No-op implementation when encryption is disabled.
540    #[cfg(all(feature = "arrow", not(feature = "encryption")))]
541    pub(crate) fn add_crypto_context(
542        self,
543        _rg_idx: usize,
544        _column_idx: usize,
545        _parquet_meta_data: &ParquetMetaData,
546        _column_chunk_metadata: &ColumnChunkMetaData,
547    ) -> Result<SerializedPageReader<R>> {
548        Ok(self)
549    }
550
551    /// Adds any necessary crypto context to this page reader, if encryption is enabled.
552    #[cfg(feature = "encryption")]
553    pub(crate) fn add_crypto_context(
554        mut self,
555        rg_idx: usize,
556        column_idx: usize,
557        parquet_meta_data: &ParquetMetaData,
558        column_chunk_metadata: &ColumnChunkMetaData,
559    ) -> Result<SerializedPageReader<R>> {
560        let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
561            return Ok(self);
562        };
563        let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
564            return Ok(self);
565        };
566        let crypto_context =
567            CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
568        self.context.crypto_context = Some(Arc::new(crypto_context));
569        Ok(self)
570    }
571
572    /// Creates a new serialized page with custom options.
573    pub fn new_with_properties(
574        reader: Arc<R>,
575        meta: &ColumnChunkMetaData,
576        total_rows: usize,
577        page_locations: Option<Vec<PageLocation>>,
578        props: ReaderPropertiesPtr,
579    ) -> Result<Self> {
580        let decompressor = create_codec(meta.compression(), props.codec_options())?;
581        let (start, len) = meta.byte_range();
582
583        let state = match page_locations {
584            Some(locations) => {
585                // If the offset of the first page doesn't match the start of the column chunk
586                // then the preceding space must contain a dictionary page.
587                let dictionary_page = match locations.first() {
588                    Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
589                        offset: start as i64,
590                        compressed_page_size: (dict_offset.offset as u64 - start) as i32,
591                        first_row_index: 0,
592                    }),
593                    _ => None,
594                };
595
596                SerializedPageReaderState::Pages {
597                    page_locations: locations.into(),
598                    dictionary_page,
599                    total_rows,
600                    page_index: 0,
601                }
602            }
603            None => SerializedPageReaderState::Values {
604                offset: usize::try_from(start)?,
605                remaining_bytes: usize::try_from(len)?,
606                next_page_header: None,
607                page_index: 0,
608                require_dictionary: meta.dictionary_page_offset().is_some(),
609            },
610        };
611        Ok(Self {
612            reader,
613            decompressor,
614            state,
615            physical_type: meta.column_type(),
616            context: Default::default(),
617        })
618    }
619
620    /// Similar to `peek_next_page`, but returns the offset of the next page instead of the page metadata.
621    /// Unlike page metadata, an offset can uniquely identify a page.
622    ///
623    /// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice.
624    /// This function allows us to check if the next page is being cached or read previously.
625    #[cfg(test)]
626    fn peek_next_page_offset(&mut self) -> Result<Option<usize>> {
627        match &mut self.state {
628            SerializedPageReaderState::Values {
629                offset,
630                remaining_bytes,
631                next_page_header,
632                page_index,
633                require_dictionary,
634            } => {
635                loop {
636                    if *remaining_bytes == 0 {
637                        return Ok(None);
638                    }
639                    return if let Some(header) = next_page_header.as_ref() {
640                        if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
641                            Ok(Some(*offset))
642                        } else {
643                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
644                            *next_page_header = None;
645                            continue;
646                        }
647                    } else {
648                        let mut read = self.reader.get_read(*offset as u64)?;
649                        let (header_len, header) = Self::read_page_header_len(
650                            &self.context,
651                            &mut read,
652                            *page_index,
653                            *require_dictionary,
654                        )?;
655                        *offset += header_len;
656                        *remaining_bytes -= header_len;
657                        let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
658                            Ok(Some(*offset))
659                        } else {
660                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
661                            continue;
662                        };
663                        *next_page_header = Some(Box::new(header));
664                        page_meta
665                    };
666                }
667            }
668            SerializedPageReaderState::Pages {
669                page_locations,
670                dictionary_page,
671                ..
672            } => {
673                if let Some(page) = dictionary_page {
674                    Ok(Some(usize::try_from(page.offset)?))
675                } else if let Some(page) = page_locations.front() {
676                    Ok(Some(usize::try_from(page.offset)?))
677                } else {
678                    Ok(None)
679                }
680            }
681        }
682    }
683
684    fn read_page_header_len<T: Read>(
685        context: &SerializedPageReaderContext,
686        input: &mut T,
687        page_index: usize,
688        dictionary_page: bool,
689    ) -> Result<(usize, PageHeader)> {
690        /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
691        struct TrackedRead<R> {
692            inner: R,
693            bytes_read: usize,
694        }
695
696        impl<R: Read> Read for TrackedRead<R> {
697            fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
698                let v = self.inner.read(buf)?;
699                self.bytes_read += v;
700                Ok(v)
701            }
702        }
703
704        let mut tracked = TrackedRead {
705            inner: input,
706            bytes_read: 0,
707        };
708        let header = context.read_page_header(&mut tracked, page_index, dictionary_page)?;
709        Ok((tracked.bytes_read, header))
710    }
711
712    fn read_page_header_len_from_bytes(
713        context: &SerializedPageReaderContext,
714        buffer: &[u8],
715        page_index: usize,
716        dictionary_page: bool,
717    ) -> Result<(usize, PageHeader)> {
718        let mut input = std::io::Cursor::new(buffer);
719        let header = context.read_page_header(&mut input, page_index, dictionary_page)?;
720        let header_len = input.position() as usize;
721        Ok((header_len, header))
722    }
723}
724
725#[cfg(not(feature = "encryption"))]
726impl SerializedPageReaderContext {
727    fn read_page_header<T: Read>(
728        &self,
729        input: &mut T,
730        _page_index: usize,
731        _dictionary_page: bool,
732    ) -> Result<PageHeader> {
733        let mut prot = TCompactInputProtocol::new(input);
734        Ok(PageHeader::read_from_in_protocol(&mut prot)?)
735    }
736
737    fn decrypt_page_data<T>(
738        &self,
739        buffer: T,
740        _page_index: usize,
741        _dictionary_page: bool,
742    ) -> Result<T> {
743        Ok(buffer)
744    }
745}
746
747#[cfg(feature = "encryption")]
748impl SerializedPageReaderContext {
749    fn read_page_header<T: Read>(
750        &self,
751        input: &mut T,
752        page_index: usize,
753        dictionary_page: bool,
754    ) -> Result<PageHeader> {
755        match self.page_crypto_context(page_index, dictionary_page) {
756            None => {
757                let mut prot = TCompactInputProtocol::new(input);
758                Ok(PageHeader::read_from_in_protocol(&mut prot)?)
759            }
760            Some(page_crypto_context) => {
761                let data_decryptor = page_crypto_context.data_decryptor();
762                let aad = page_crypto_context.create_page_header_aad()?;
763
764                let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
765                    ParquetError::General(format!(
766                        "Error decrypting page header for column {}, decryption key may be wrong",
767                        page_crypto_context.column_ordinal
768                    ))
769                })?;
770
771                let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
772                Ok(PageHeader::read_from_in_protocol(&mut prot)?)
773            }
774        }
775    }
776
777    fn decrypt_page_data<T>(&self, buffer: T, page_index: usize, dictionary_page: bool) -> Result<T>
778    where
779        T: AsRef<[u8]>,
780        T: From<Vec<u8>>,
781    {
782        let page_crypto_context = self.page_crypto_context(page_index, dictionary_page);
783        if let Some(page_crypto_context) = page_crypto_context {
784            let decryptor = page_crypto_context.data_decryptor();
785            let aad = page_crypto_context.create_page_aad()?;
786            let decrypted = decryptor.decrypt(buffer.as_ref(), &aad)?;
787            Ok(T::from(decrypted))
788        } else {
789            Ok(buffer)
790        }
791    }
792
793    fn page_crypto_context(
794        &self,
795        page_index: usize,
796        dictionary_page: bool,
797    ) -> Option<Arc<CryptoContext>> {
798        self.crypto_context.as_ref().map(|c| {
799            Arc::new(if dictionary_page {
800                c.for_dictionary_page()
801            } else {
802                c.with_page_ordinal(page_index)
803            })
804        })
805    }
806}
807
808impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
809    type Item = Result<Page>;
810
811    fn next(&mut self) -> Option<Self::Item> {
812        self.get_next_page().transpose()
813    }
814}
815
816fn verify_page_header_len(header_len: usize, remaining_bytes: usize) -> Result<()> {
817    if header_len > remaining_bytes {
818        return Err(eof_err!("Invalid page header"));
819    }
820    Ok(())
821}
822
823fn verify_page_size(
824    compressed_size: i32,
825    uncompressed_size: i32,
826    remaining_bytes: usize,
827) -> Result<()> {
828    // The page's compressed size should not exceed the remaining bytes that are
829    // available to read. The page's uncompressed size is the expected size
830    // after decompression, which can never be negative.
831    if compressed_size < 0 || compressed_size as usize > remaining_bytes || uncompressed_size < 0 {
832        return Err(eof_err!("Invalid page header"));
833    }
834    Ok(())
835}
836
837impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
838    fn get_next_page(&mut self) -> Result<Option<Page>> {
839        loop {
840            let page = match &mut self.state {
841                SerializedPageReaderState::Values {
842                    offset,
843                    remaining_bytes: remaining,
844                    next_page_header,
845                    page_index,
846                    require_dictionary,
847                } => {
848                    if *remaining == 0 {
849                        return Ok(None);
850                    }
851
852                    let mut read = self.reader.get_read(*offset as u64)?;
853                    let header = if let Some(header) = next_page_header.take() {
854                        *header
855                    } else {
856                        let (header_len, header) = Self::read_page_header_len(
857                            &self.context,
858                            &mut read,
859                            *page_index,
860                            *require_dictionary,
861                        )?;
862                        verify_page_header_len(header_len, *remaining)?;
863                        *offset += header_len;
864                        *remaining -= header_len;
865                        header
866                    };
867                    verify_page_size(
868                        header.compressed_page_size,
869                        header.uncompressed_page_size,
870                        *remaining,
871                    )?;
872                    let data_len = header.compressed_page_size as usize;
873                    *offset += data_len;
874                    *remaining -= data_len;
875
876                    if header.type_ == PageType::INDEX_PAGE {
877                        continue;
878                    }
879
880                    let mut buffer = Vec::with_capacity(data_len);
881                    let read = read.take(data_len as u64).read_to_end(&mut buffer)?;
882
883                    if read != data_len {
884                        return Err(eof_err!(
885                            "Expected to read {} bytes of page, read only {}",
886                            data_len,
887                            read
888                        ));
889                    }
890
891                    let buffer =
892                        self.context
893                            .decrypt_page_data(buffer, *page_index, *require_dictionary)?;
894
895                    let page = decode_page(
896                        header,
897                        Bytes::from(buffer),
898                        self.physical_type,
899                        self.decompressor.as_mut(),
900                    )?;
901                    if page.is_data_page() {
902                        *page_index += 1;
903                    } else if page.is_dictionary_page() {
904                        *require_dictionary = false;
905                    }
906                    page
907                }
908                SerializedPageReaderState::Pages {
909                    page_locations,
910                    dictionary_page,
911                    page_index,
912                    ..
913                } => {
914                    let (front, is_dictionary_page) = match dictionary_page.take() {
915                        Some(front) => (front, true),
916                        None => match page_locations.pop_front() {
917                            Some(front) => (front, false),
918                            None => return Ok(None),
919                        },
920                    };
921
922                    let page_len = usize::try_from(front.compressed_page_size)?;
923                    let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
924
925                    let (offset, header) = Self::read_page_header_len_from_bytes(
926                        &self.context,
927                        buffer.as_ref(),
928                        *page_index,
929                        is_dictionary_page,
930                    )?;
931                    let bytes = buffer.slice(offset..);
932                    let bytes =
933                        self.context
934                            .decrypt_page_data(bytes, *page_index, is_dictionary_page)?;
935
936                    if !is_dictionary_page {
937                        *page_index += 1;
938                    }
939                    decode_page(
940                        header,
941                        bytes,
942                        self.physical_type,
943                        self.decompressor.as_mut(),
944                    )?
945                }
946            };
947
948            return Ok(Some(page));
949        }
950    }
951
952    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
953        match &mut self.state {
954            SerializedPageReaderState::Values {
955                offset,
956                remaining_bytes,
957                next_page_header,
958                page_index,
959                require_dictionary,
960            } => {
961                loop {
962                    if *remaining_bytes == 0 {
963                        return Ok(None);
964                    }
965                    return if let Some(header) = next_page_header.as_ref() {
966                        if let Ok(page_meta) = (&**header).try_into() {
967                            Ok(Some(page_meta))
968                        } else {
969                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
970                            *next_page_header = None;
971                            continue;
972                        }
973                    } else {
974                        let mut read = self.reader.get_read(*offset as u64)?;
975                        let (header_len, header) = Self::read_page_header_len(
976                            &self.context,
977                            &mut read,
978                            *page_index,
979                            *require_dictionary,
980                        )?;
981                        verify_page_header_len(header_len, *remaining_bytes)?;
982                        *offset += header_len;
983                        *remaining_bytes -= header_len;
984                        let page_meta = if let Ok(page_meta) = (&header).try_into() {
985                            Ok(Some(page_meta))
986                        } else {
987                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
988                            continue;
989                        };
990                        *next_page_header = Some(Box::new(header));
991                        page_meta
992                    };
993                }
994            }
995            SerializedPageReaderState::Pages {
996                page_locations,
997                dictionary_page,
998                total_rows,
999                page_index: _,
1000            } => {
1001                if dictionary_page.is_some() {
1002                    Ok(Some(PageMetadata {
1003                        num_rows: None,
1004                        num_levels: None,
1005                        is_dict: true,
1006                    }))
1007                } else if let Some(page) = page_locations.front() {
1008                    let next_rows = page_locations
1009                        .get(1)
1010                        .map(|x| x.first_row_index as usize)
1011                        .unwrap_or(*total_rows);
1012
1013                    Ok(Some(PageMetadata {
1014                        num_rows: Some(next_rows - page.first_row_index as usize),
1015                        num_levels: None,
1016                        is_dict: false,
1017                    }))
1018                } else {
1019                    Ok(None)
1020                }
1021            }
1022        }
1023    }
1024
1025    fn skip_next_page(&mut self) -> Result<()> {
1026        match &mut self.state {
1027            SerializedPageReaderState::Values {
1028                offset,
1029                remaining_bytes,
1030                next_page_header,
1031                page_index,
1032                require_dictionary,
1033            } => {
1034                if let Some(buffered_header) = next_page_header.take() {
1035                    verify_page_size(
1036                        buffered_header.compressed_page_size,
1037                        buffered_header.uncompressed_page_size,
1038                        *remaining_bytes,
1039                    )?;
1040                    // The next page header has already been peeked, so just advance the offset
1041                    *offset += buffered_header.compressed_page_size as usize;
1042                    *remaining_bytes -= buffered_header.compressed_page_size as usize;
1043                } else {
1044                    let mut read = self.reader.get_read(*offset as u64)?;
1045                    let (header_len, header) = Self::read_page_header_len(
1046                        &self.context,
1047                        &mut read,
1048                        *page_index,
1049                        *require_dictionary,
1050                    )?;
1051                    verify_page_header_len(header_len, *remaining_bytes)?;
1052                    verify_page_size(
1053                        header.compressed_page_size,
1054                        header.uncompressed_page_size,
1055                        *remaining_bytes,
1056                    )?;
1057                    let data_page_size = header.compressed_page_size as usize;
1058                    *offset += header_len + data_page_size;
1059                    *remaining_bytes -= header_len + data_page_size;
1060                }
1061                if *require_dictionary {
1062                    *require_dictionary = false;
1063                } else {
1064                    *page_index += 1;
1065                }
1066                Ok(())
1067            }
1068            SerializedPageReaderState::Pages {
1069                page_locations,
1070                dictionary_page,
1071                page_index,
1072                ..
1073            } => {
1074                if dictionary_page.is_some() {
1075                    // If a dictionary page exists, consume it by taking it (sets to None)
1076                    dictionary_page.take();
1077                } else {
1078                    // If no dictionary page exists, simply pop the data page from page_locations
1079                    if page_locations.pop_front().is_some() {
1080                        *page_index += 1;
1081                    }
1082                }
1083
1084                Ok(())
1085            }
1086        }
1087    }
1088
1089    fn at_record_boundary(&mut self) -> Result<bool> {
1090        match &mut self.state {
1091            SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
1092            SerializedPageReaderState::Pages { .. } => Ok(true),
1093        }
1094    }
1095}
1096
1097#[cfg(test)]
1098mod tests {
1099    use std::collections::HashSet;
1100
1101    use bytes::Buf;
1102
1103    use crate::file::properties::{EnabledStatistics, WriterProperties};
1104    use crate::format::BoundaryOrder;
1105
1106    use crate::basic::{self, ColumnOrder, SortOrder};
1107    use crate::column::reader::ColumnReader;
1108    use crate::data_type::private::ParquetValueType;
1109    use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1110    use crate::file::page_index::index::{Index, NativeIndex};
1111    #[allow(deprecated)]
1112    use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
1113    use crate::file::writer::SerializedFileWriter;
1114    use crate::record::RowAccessor;
1115    use crate::schema::parser::parse_message_type;
1116    use crate::util::test_common::file_util::{get_test_file, get_test_path};
1117
1118    use super::*;
1119
1120    #[test]
1121    fn test_cursor_and_file_has_the_same_behaviour() {
1122        let mut buf: Vec<u8> = Vec::new();
1123        get_test_file("alltypes_plain.parquet")
1124            .read_to_end(&mut buf)
1125            .unwrap();
1126        let cursor = Bytes::from(buf);
1127        let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1128
1129        let test_file = get_test_file("alltypes_plain.parquet");
1130        let read_from_file = SerializedFileReader::new(test_file).unwrap();
1131
1132        let file_iter = read_from_file.get_row_iter(None).unwrap();
1133        let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1134
1135        for (a, b) in file_iter.zip(cursor_iter) {
1136            assert_eq!(a.unwrap(), b.unwrap())
1137        }
1138    }
1139
1140    #[test]
1141    fn test_file_reader_try_from() {
1142        // Valid file path
1143        let test_file = get_test_file("alltypes_plain.parquet");
1144        let test_path_buf = get_test_path("alltypes_plain.parquet");
1145        let test_path = test_path_buf.as_path();
1146        let test_path_str = test_path.to_str().unwrap();
1147
1148        let reader = SerializedFileReader::try_from(test_file);
1149        assert!(reader.is_ok());
1150
1151        let reader = SerializedFileReader::try_from(test_path);
1152        assert!(reader.is_ok());
1153
1154        let reader = SerializedFileReader::try_from(test_path_str);
1155        assert!(reader.is_ok());
1156
1157        let reader = SerializedFileReader::try_from(test_path_str.to_string());
1158        assert!(reader.is_ok());
1159
1160        // Invalid file path
1161        let test_path = Path::new("invalid.parquet");
1162        let test_path_str = test_path.to_str().unwrap();
1163
1164        let reader = SerializedFileReader::try_from(test_path);
1165        assert!(reader.is_err());
1166
1167        let reader = SerializedFileReader::try_from(test_path_str);
1168        assert!(reader.is_err());
1169
1170        let reader = SerializedFileReader::try_from(test_path_str.to_string());
1171        assert!(reader.is_err());
1172    }
1173
1174    #[test]
1175    fn test_file_reader_into_iter() {
1176        let path = get_test_path("alltypes_plain.parquet");
1177        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1178        let iter = reader.into_iter();
1179        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1180
1181        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1182    }
1183
1184    #[test]
1185    fn test_file_reader_into_iter_project() {
1186        let path = get_test_path("alltypes_plain.parquet");
1187        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1188        let schema = "message schema { OPTIONAL INT32 id; }";
1189        let proj = parse_message_type(schema).ok();
1190        let iter = reader.into_iter().project(proj).unwrap();
1191        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1192
1193        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1194    }
1195
1196    #[test]
1197    fn test_reuse_file_chunk() {
1198        // This test covers the case of maintaining the correct start position in a file
1199        // stream for each column reader after initializing and moving to the next one
1200        // (without necessarily reading the entire column).
1201        let test_file = get_test_file("alltypes_plain.parquet");
1202        let reader = SerializedFileReader::new(test_file).unwrap();
1203        let row_group = reader.get_row_group(0).unwrap();
1204
1205        let mut page_readers = Vec::new();
1206        for i in 0..row_group.num_columns() {
1207            page_readers.push(row_group.get_column_page_reader(i).unwrap());
1208        }
1209
1210        // Now buffer each col reader, we do not expect any failures like:
1211        // General("underlying Thrift error: end of file")
1212        for mut page_reader in page_readers {
1213            assert!(page_reader.get_next_page().is_ok());
1214        }
1215    }
1216
1217    #[test]
1218    fn test_file_reader() {
1219        let test_file = get_test_file("alltypes_plain.parquet");
1220        let reader_result = SerializedFileReader::new(test_file);
1221        assert!(reader_result.is_ok());
1222        let reader = reader_result.unwrap();
1223
1224        // Test contents in Parquet metadata
1225        let metadata = reader.metadata();
1226        assert_eq!(metadata.num_row_groups(), 1);
1227
1228        // Test contents in file metadata
1229        let file_metadata = metadata.file_metadata();
1230        assert!(file_metadata.created_by().is_some());
1231        assert_eq!(
1232            file_metadata.created_by().unwrap(),
1233            "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1234        );
1235        assert!(file_metadata.key_value_metadata().is_none());
1236        assert_eq!(file_metadata.num_rows(), 8);
1237        assert_eq!(file_metadata.version(), 1);
1238        assert_eq!(file_metadata.column_orders(), None);
1239
1240        // Test contents in row group metadata
1241        let row_group_metadata = metadata.row_group(0);
1242        assert_eq!(row_group_metadata.num_columns(), 11);
1243        assert_eq!(row_group_metadata.num_rows(), 8);
1244        assert_eq!(row_group_metadata.total_byte_size(), 671);
1245        // Check each column order
1246        for i in 0..row_group_metadata.num_columns() {
1247            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1248        }
1249
1250        // Test row group reader
1251        let row_group_reader_result = reader.get_row_group(0);
1252        assert!(row_group_reader_result.is_ok());
1253        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1254        assert_eq!(
1255            row_group_reader.num_columns(),
1256            row_group_metadata.num_columns()
1257        );
1258        assert_eq!(
1259            row_group_reader.metadata().total_byte_size(),
1260            row_group_metadata.total_byte_size()
1261        );
1262
1263        // Test page readers
1264        // TODO: test for every column
1265        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1266        assert!(page_reader_0_result.is_ok());
1267        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1268        let mut page_count = 0;
1269        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1270            let is_expected_page = match page {
1271                Page::DictionaryPage {
1272                    buf,
1273                    num_values,
1274                    encoding,
1275                    is_sorted,
1276                } => {
1277                    assert_eq!(buf.len(), 32);
1278                    assert_eq!(num_values, 8);
1279                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1280                    assert!(!is_sorted);
1281                    true
1282                }
1283                Page::DataPage {
1284                    buf,
1285                    num_values,
1286                    encoding,
1287                    def_level_encoding,
1288                    rep_level_encoding,
1289                    statistics,
1290                } => {
1291                    assert_eq!(buf.len(), 11);
1292                    assert_eq!(num_values, 8);
1293                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1294                    assert_eq!(def_level_encoding, Encoding::RLE);
1295                    #[allow(deprecated)]
1296                    let expected_rep_level_encoding = Encoding::BIT_PACKED;
1297                    assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1298                    assert!(statistics.is_none());
1299                    true
1300                }
1301                _ => false,
1302            };
1303            assert!(is_expected_page);
1304            page_count += 1;
1305        }
1306        assert_eq!(page_count, 2);
1307    }
1308
1309    #[test]
1310    fn test_file_reader_datapage_v2() {
1311        let test_file = get_test_file("datapage_v2.snappy.parquet");
1312        let reader_result = SerializedFileReader::new(test_file);
1313        assert!(reader_result.is_ok());
1314        let reader = reader_result.unwrap();
1315
1316        // Test contents in Parquet metadata
1317        let metadata = reader.metadata();
1318        assert_eq!(metadata.num_row_groups(), 1);
1319
1320        // Test contents in file metadata
1321        let file_metadata = metadata.file_metadata();
1322        assert!(file_metadata.created_by().is_some());
1323        assert_eq!(
1324            file_metadata.created_by().unwrap(),
1325            "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1326        );
1327        assert!(file_metadata.key_value_metadata().is_some());
1328        assert_eq!(
1329            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1330            1
1331        );
1332
1333        assert_eq!(file_metadata.num_rows(), 5);
1334        assert_eq!(file_metadata.version(), 1);
1335        assert_eq!(file_metadata.column_orders(), None);
1336
1337        let row_group_metadata = metadata.row_group(0);
1338
1339        // Check each column order
1340        for i in 0..row_group_metadata.num_columns() {
1341            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1342        }
1343
1344        // Test row group reader
1345        let row_group_reader_result = reader.get_row_group(0);
1346        assert!(row_group_reader_result.is_ok());
1347        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1348        assert_eq!(
1349            row_group_reader.num_columns(),
1350            row_group_metadata.num_columns()
1351        );
1352        assert_eq!(
1353            row_group_reader.metadata().total_byte_size(),
1354            row_group_metadata.total_byte_size()
1355        );
1356
1357        // Test page readers
1358        // TODO: test for every column
1359        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1360        assert!(page_reader_0_result.is_ok());
1361        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1362        let mut page_count = 0;
1363        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1364            let is_expected_page = match page {
1365                Page::DictionaryPage {
1366                    buf,
1367                    num_values,
1368                    encoding,
1369                    is_sorted,
1370                } => {
1371                    assert_eq!(buf.len(), 7);
1372                    assert_eq!(num_values, 1);
1373                    assert_eq!(encoding, Encoding::PLAIN);
1374                    assert!(!is_sorted);
1375                    true
1376                }
1377                Page::DataPageV2 {
1378                    buf,
1379                    num_values,
1380                    encoding,
1381                    num_nulls,
1382                    num_rows,
1383                    def_levels_byte_len,
1384                    rep_levels_byte_len,
1385                    is_compressed,
1386                    statistics,
1387                } => {
1388                    assert_eq!(buf.len(), 4);
1389                    assert_eq!(num_values, 5);
1390                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1391                    assert_eq!(num_nulls, 1);
1392                    assert_eq!(num_rows, 5);
1393                    assert_eq!(def_levels_byte_len, 2);
1394                    assert_eq!(rep_levels_byte_len, 0);
1395                    assert!(is_compressed);
1396                    assert!(statistics.is_some());
1397                    true
1398                }
1399                _ => false,
1400            };
1401            assert!(is_expected_page);
1402            page_count += 1;
1403        }
1404        assert_eq!(page_count, 2);
1405    }
1406
1407    #[test]
1408    fn test_file_reader_empty_compressed_datapage_v2() {
1409        // this file has a compressed datapage that un-compresses to 0 bytes
1410        let test_file = get_test_file("page_v2_empty_compressed.parquet");
1411        let reader_result = SerializedFileReader::new(test_file);
1412        assert!(reader_result.is_ok());
1413        let reader = reader_result.unwrap();
1414
1415        // Test contents in Parquet metadata
1416        let metadata = reader.metadata();
1417        assert_eq!(metadata.num_row_groups(), 1);
1418
1419        // Test contents in file metadata
1420        let file_metadata = metadata.file_metadata();
1421        assert!(file_metadata.created_by().is_some());
1422        assert_eq!(
1423            file_metadata.created_by().unwrap(),
1424            "parquet-cpp-arrow version 14.0.2"
1425        );
1426        assert!(file_metadata.key_value_metadata().is_some());
1427        assert_eq!(
1428            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1429            1
1430        );
1431
1432        assert_eq!(file_metadata.num_rows(), 10);
1433        assert_eq!(file_metadata.version(), 2);
1434        let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1435        assert_eq!(
1436            file_metadata.column_orders(),
1437            Some(vec![expected_order].as_ref())
1438        );
1439
1440        let row_group_metadata = metadata.row_group(0);
1441
1442        // Check each column order
1443        for i in 0..row_group_metadata.num_columns() {
1444            assert_eq!(file_metadata.column_order(i), expected_order);
1445        }
1446
1447        // Test row group reader
1448        let row_group_reader_result = reader.get_row_group(0);
1449        assert!(row_group_reader_result.is_ok());
1450        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1451        assert_eq!(
1452            row_group_reader.num_columns(),
1453            row_group_metadata.num_columns()
1454        );
1455        assert_eq!(
1456            row_group_reader.metadata().total_byte_size(),
1457            row_group_metadata.total_byte_size()
1458        );
1459
1460        // Test page readers
1461        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1462        assert!(page_reader_0_result.is_ok());
1463        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1464        let mut page_count = 0;
1465        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1466            let is_expected_page = match page {
1467                Page::DictionaryPage {
1468                    buf,
1469                    num_values,
1470                    encoding,
1471                    is_sorted,
1472                } => {
1473                    assert_eq!(buf.len(), 0);
1474                    assert_eq!(num_values, 0);
1475                    assert_eq!(encoding, Encoding::PLAIN);
1476                    assert!(!is_sorted);
1477                    true
1478                }
1479                Page::DataPageV2 {
1480                    buf,
1481                    num_values,
1482                    encoding,
1483                    num_nulls,
1484                    num_rows,
1485                    def_levels_byte_len,
1486                    rep_levels_byte_len,
1487                    is_compressed,
1488                    statistics,
1489                } => {
1490                    assert_eq!(buf.len(), 3);
1491                    assert_eq!(num_values, 10);
1492                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1493                    assert_eq!(num_nulls, 10);
1494                    assert_eq!(num_rows, 10);
1495                    assert_eq!(def_levels_byte_len, 2);
1496                    assert_eq!(rep_levels_byte_len, 0);
1497                    assert!(is_compressed);
1498                    assert!(statistics.is_some());
1499                    true
1500                }
1501                _ => false,
1502            };
1503            assert!(is_expected_page);
1504            page_count += 1;
1505        }
1506        assert_eq!(page_count, 2);
1507    }
1508
1509    #[test]
1510    fn test_file_reader_empty_datapage_v2() {
1511        // this file has 0 bytes compressed datapage that un-compresses to 0 bytes
1512        let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1513        let reader_result = SerializedFileReader::new(test_file);
1514        assert!(reader_result.is_ok());
1515        let reader = reader_result.unwrap();
1516
1517        // Test contents in Parquet metadata
1518        let metadata = reader.metadata();
1519        assert_eq!(metadata.num_row_groups(), 1);
1520
1521        // Test contents in file metadata
1522        let file_metadata = metadata.file_metadata();
1523        assert!(file_metadata.created_by().is_some());
1524        assert_eq!(
1525            file_metadata.created_by().unwrap(),
1526            "parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1527        );
1528        assert!(file_metadata.key_value_metadata().is_some());
1529        assert_eq!(
1530            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1531            2
1532        );
1533
1534        assert_eq!(file_metadata.num_rows(), 1);
1535        assert_eq!(file_metadata.version(), 1);
1536        let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1537        assert_eq!(
1538            file_metadata.column_orders(),
1539            Some(vec![expected_order].as_ref())
1540        );
1541
1542        let row_group_metadata = metadata.row_group(0);
1543
1544        // Check each column order
1545        for i in 0..row_group_metadata.num_columns() {
1546            assert_eq!(file_metadata.column_order(i), expected_order);
1547        }
1548
1549        // Test row group reader
1550        let row_group_reader_result = reader.get_row_group(0);
1551        assert!(row_group_reader_result.is_ok());
1552        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1553        assert_eq!(
1554            row_group_reader.num_columns(),
1555            row_group_metadata.num_columns()
1556        );
1557        assert_eq!(
1558            row_group_reader.metadata().total_byte_size(),
1559            row_group_metadata.total_byte_size()
1560        );
1561
1562        // Test page readers
1563        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1564        assert!(page_reader_0_result.is_ok());
1565        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1566        let mut page_count = 0;
1567        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1568            let is_expected_page = match page {
1569                Page::DataPageV2 {
1570                    buf,
1571                    num_values,
1572                    encoding,
1573                    num_nulls,
1574                    num_rows,
1575                    def_levels_byte_len,
1576                    rep_levels_byte_len,
1577                    is_compressed,
1578                    statistics,
1579                } => {
1580                    assert_eq!(buf.len(), 2);
1581                    assert_eq!(num_values, 1);
1582                    assert_eq!(encoding, Encoding::PLAIN);
1583                    assert_eq!(num_nulls, 1);
1584                    assert_eq!(num_rows, 1);
1585                    assert_eq!(def_levels_byte_len, 2);
1586                    assert_eq!(rep_levels_byte_len, 0);
1587                    assert!(is_compressed);
1588                    assert!(statistics.is_none());
1589                    true
1590                }
1591                _ => false,
1592            };
1593            assert!(is_expected_page);
1594            page_count += 1;
1595        }
1596        assert_eq!(page_count, 1);
1597    }
1598
1599    fn get_serialized_page_reader<R: ChunkReader>(
1600        file_reader: &SerializedFileReader<R>,
1601        row_group: usize,
1602        column: usize,
1603    ) -> Result<SerializedPageReader<R>> {
1604        let row_group = {
1605            let row_group_metadata = file_reader.metadata.row_group(row_group);
1606            let props = Arc::clone(&file_reader.props);
1607            let f = Arc::clone(&file_reader.chunk_reader);
1608            SerializedRowGroupReader::new(
1609                f,
1610                row_group_metadata,
1611                file_reader
1612                    .metadata
1613                    .offset_index()
1614                    .map(|x| x[row_group].as_slice()),
1615                props,
1616            )?
1617        };
1618
1619        let col = row_group.metadata.column(column);
1620
1621        let page_locations = row_group
1622            .offset_index
1623            .map(|x| x[column].page_locations.clone());
1624
1625        let props = Arc::clone(&row_group.props);
1626        SerializedPageReader::new_with_properties(
1627            Arc::clone(&row_group.chunk_reader),
1628            col,
1629            usize::try_from(row_group.metadata.num_rows())?,
1630            page_locations,
1631            props,
1632        )
1633    }
1634
1635    #[test]
1636    fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1637        let test_file = get_test_file("alltypes_plain.parquet");
1638        let reader = SerializedFileReader::new(test_file)?;
1639
1640        let mut offset_set = HashSet::new();
1641        let num_row_groups = reader.metadata.num_row_groups();
1642        for row_group in 0..num_row_groups {
1643            let num_columns = reader.metadata.row_group(row_group).num_columns();
1644            for column in 0..num_columns {
1645                let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1646
1647                while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1648                    match &page_reader.state {
1649                        SerializedPageReaderState::Pages {
1650                            page_locations,
1651                            dictionary_page,
1652                            ..
1653                        } => {
1654                            if let Some(page) = dictionary_page {
1655                                assert_eq!(page.offset as usize, page_offset);
1656                            } else if let Some(page) = page_locations.front() {
1657                                assert_eq!(page.offset as usize, page_offset);
1658                            } else {
1659                                unreachable!()
1660                            }
1661                        }
1662                        SerializedPageReaderState::Values {
1663                            offset,
1664                            next_page_header,
1665                            ..
1666                        } => {
1667                            assert!(next_page_header.is_some());
1668                            assert_eq!(*offset, page_offset);
1669                        }
1670                    }
1671                    let page = page_reader.get_next_page()?;
1672                    assert!(page.is_some());
1673                    let newly_inserted = offset_set.insert(page_offset);
1674                    assert!(newly_inserted);
1675                }
1676            }
1677        }
1678
1679        Ok(())
1680    }
1681
1682    #[test]
1683    fn test_page_iterator() {
1684        let file = get_test_file("alltypes_plain.parquet");
1685        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1686
1687        let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1688
1689        // read first page
1690        let page = page_iterator.next();
1691        assert!(page.is_some());
1692        assert!(page.unwrap().is_ok());
1693
1694        // reach end of file
1695        let page = page_iterator.next();
1696        assert!(page.is_none());
1697
1698        let row_group_indices = Box::new(0..1);
1699        let mut page_iterator =
1700            FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1701
1702        // read first page
1703        let page = page_iterator.next();
1704        assert!(page.is_some());
1705        assert!(page.unwrap().is_ok());
1706
1707        // reach end of file
1708        let page = page_iterator.next();
1709        assert!(page.is_none());
1710    }
1711
1712    #[test]
1713    fn test_file_reader_key_value_metadata() {
1714        let file = get_test_file("binary.parquet");
1715        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1716
1717        let metadata = file_reader
1718            .metadata
1719            .file_metadata()
1720            .key_value_metadata()
1721            .unwrap();
1722
1723        assert_eq!(metadata.len(), 3);
1724
1725        assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1726
1727        assert_eq!(metadata[1].key, "writer.model.name");
1728        assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1729
1730        assert_eq!(metadata[2].key, "parquet.proto.class");
1731        assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1732    }
1733
1734    #[test]
1735    fn test_file_reader_optional_metadata() {
1736        // file with optional metadata: bloom filters, encoding stats, column index and offset index.
1737        let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1738        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1739
1740        let row_group_metadata = file_reader.metadata.row_group(0);
1741        let col0_metadata = row_group_metadata.column(0);
1742
1743        // test optional bloom filter offset
1744        assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1745
1746        // test page encoding stats
1747        let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1748
1749        assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1750        assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1751        assert_eq!(page_encoding_stats.count, 1);
1752
1753        // test optional column index offset
1754        assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1755        assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1756
1757        // test optional offset index offset
1758        assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1759        assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1760    }
1761
1762    #[test]
1763    fn test_file_reader_with_no_filter() -> Result<()> {
1764        let test_file = get_test_file("alltypes_plain.parquet");
1765        let origin_reader = SerializedFileReader::new(test_file)?;
1766        // test initial number of row groups
1767        let metadata = origin_reader.metadata();
1768        assert_eq!(metadata.num_row_groups(), 1);
1769        Ok(())
1770    }
1771
1772    #[test]
1773    fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
1774        let test_file = get_test_file("alltypes_plain.parquet");
1775        let read_options = ReadOptionsBuilder::new()
1776            .with_predicate(Box::new(|_, _| false))
1777            .build();
1778        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1779        let metadata = reader.metadata();
1780        assert_eq!(metadata.num_row_groups(), 0);
1781        Ok(())
1782    }
1783
1784    #[test]
1785    fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
1786        let test_file = get_test_file("alltypes_plain.parquet");
1787        let origin_reader = SerializedFileReader::new(test_file)?;
1788        // test initial number of row groups
1789        let metadata = origin_reader.metadata();
1790        assert_eq!(metadata.num_row_groups(), 1);
1791        let mid = get_midpoint_offset(metadata.row_group(0));
1792
1793        let test_file = get_test_file("alltypes_plain.parquet");
1794        let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
1795        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1796        let metadata = reader.metadata();
1797        assert_eq!(metadata.num_row_groups(), 1);
1798
1799        let test_file = get_test_file("alltypes_plain.parquet");
1800        let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
1801        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1802        let metadata = reader.metadata();
1803        assert_eq!(metadata.num_row_groups(), 0);
1804        Ok(())
1805    }
1806
1807    #[test]
1808    fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
1809        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1810        let origin_reader = SerializedFileReader::new(test_file)?;
1811        let metadata = origin_reader.metadata();
1812        let mid = get_midpoint_offset(metadata.row_group(0));
1813
1814        // true, true predicate
1815        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1816        let read_options = ReadOptionsBuilder::new()
1817            .with_page_index()
1818            .with_predicate(Box::new(|_, _| true))
1819            .with_range(mid, mid + 1)
1820            .build();
1821        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1822        let metadata = reader.metadata();
1823        assert_eq!(metadata.num_row_groups(), 1);
1824        assert_eq!(metadata.column_index().unwrap().len(), 1);
1825        assert_eq!(metadata.offset_index().unwrap().len(), 1);
1826
1827        // true, false predicate
1828        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1829        let read_options = ReadOptionsBuilder::new()
1830            .with_page_index()
1831            .with_predicate(Box::new(|_, _| true))
1832            .with_range(0, mid)
1833            .build();
1834        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1835        let metadata = reader.metadata();
1836        assert_eq!(metadata.num_row_groups(), 0);
1837        assert!(metadata.column_index().is_none());
1838        assert!(metadata.offset_index().is_none());
1839
1840        // false, true predicate
1841        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1842        let read_options = ReadOptionsBuilder::new()
1843            .with_page_index()
1844            .with_predicate(Box::new(|_, _| false))
1845            .with_range(mid, mid + 1)
1846            .build();
1847        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1848        let metadata = reader.metadata();
1849        assert_eq!(metadata.num_row_groups(), 0);
1850        assert!(metadata.column_index().is_none());
1851        assert!(metadata.offset_index().is_none());
1852
1853        // false, false predicate
1854        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1855        let read_options = ReadOptionsBuilder::new()
1856            .with_page_index()
1857            .with_predicate(Box::new(|_, _| false))
1858            .with_range(0, mid)
1859            .build();
1860        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1861        let metadata = reader.metadata();
1862        assert_eq!(metadata.num_row_groups(), 0);
1863        assert!(metadata.column_index().is_none());
1864        assert!(metadata.offset_index().is_none());
1865        Ok(())
1866    }
1867
1868    #[test]
1869    fn test_file_reader_invalid_metadata() {
1870        let data = [
1871            255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
1872            80, 65, 82, 49,
1873        ];
1874        let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
1875        assert_eq!(
1876            ret.err().unwrap().to_string(),
1877            "Parquet error: Could not parse metadata: bad data"
1878        );
1879    }
1880
1881    #[test]
1882    // Use java parquet-tools get below pageIndex info
1883    // !```
1884    // parquet-tools column-index ./data_index_bloom_encoding_stats.parquet
1885    // row group 0:
1886    // column index for column String:
1887    // Boundary order: ASCENDING
1888    // page-0  :
1889    // null count                 min                                  max
1890    // 0                          Hello                                today
1891    //
1892    // offset index for column String:
1893    // page-0   :
1894    // offset   compressed size       first row index
1895    // 4               152                     0
1896    ///```
1897    //
1898    fn test_page_index_reader() {
1899        let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
1900        let builder = ReadOptionsBuilder::new();
1901        //enable read page index
1902        let options = builder.with_page_index().build();
1903        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1904        let reader = reader_result.unwrap();
1905
1906        // Test contents in Parquet metadata
1907        let metadata = reader.metadata();
1908        assert_eq!(metadata.num_row_groups(), 1);
1909
1910        let column_index = metadata.column_index().unwrap();
1911
1912        // only one row group
1913        assert_eq!(column_index.len(), 1);
1914        let index = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
1915            index
1916        } else {
1917            unreachable!()
1918        };
1919
1920        assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
1921        let index_in_pages = &index.indexes;
1922
1923        //only one page group
1924        assert_eq!(index_in_pages.len(), 1);
1925
1926        let page0 = &index_in_pages[0];
1927        let min = page0.min.as_ref().unwrap();
1928        let max = page0.max.as_ref().unwrap();
1929        assert_eq!(b"Hello", min.as_bytes());
1930        assert_eq!(b"today", max.as_bytes());
1931
1932        let offset_indexes = metadata.offset_index().unwrap();
1933        // only one row group
1934        assert_eq!(offset_indexes.len(), 1);
1935        let offset_index = &offset_indexes[0];
1936        let page_offset = &offset_index[0].page_locations()[0];
1937
1938        assert_eq!(4, page_offset.offset);
1939        assert_eq!(152, page_offset.compressed_page_size);
1940        assert_eq!(0, page_offset.first_row_index);
1941    }
1942
1943    #[test]
1944    #[allow(deprecated)]
1945    fn test_page_index_reader_out_of_order() {
1946        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1947        let options = ReadOptionsBuilder::new().with_page_index().build();
1948        let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
1949        let metadata = reader.metadata();
1950
1951        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1952        let columns = metadata.row_group(0).columns();
1953        let reversed: Vec<_> = columns.iter().cloned().rev().collect();
1954
1955        let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
1956        let mut b = read_columns_indexes(&test_file, &reversed)
1957            .unwrap()
1958            .unwrap();
1959        b.reverse();
1960        assert_eq!(a, b);
1961
1962        let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
1963        let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
1964        b.reverse();
1965        assert_eq!(a, b);
1966    }
1967
1968    #[test]
1969    fn test_page_index_reader_all_type() {
1970        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1971        let builder = ReadOptionsBuilder::new();
1972        //enable read page index
1973        let options = builder.with_page_index().build();
1974        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1975        let reader = reader_result.unwrap();
1976
1977        // Test contents in Parquet metadata
1978        let metadata = reader.metadata();
1979        assert_eq!(metadata.num_row_groups(), 1);
1980
1981        let column_index = metadata.column_index().unwrap();
1982        let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
1983
1984        // only one row group
1985        assert_eq!(column_index.len(), 1);
1986        let row_group_metadata = metadata.row_group(0);
1987
1988        //col0->id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 7299, num_nulls: 0]
1989        assert!(!&column_index[0][0].is_sorted());
1990        let boundary_order = &column_index[0][0].get_boundary_order();
1991        assert!(boundary_order.is_some());
1992        matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
1993        if let Index::INT32(index) = &column_index[0][0] {
1994            check_native_page_index(
1995                index,
1996                325,
1997                get_row_group_min_max_bytes(row_group_metadata, 0),
1998                BoundaryOrder::UNORDERED,
1999            );
2000            assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
2001        } else {
2002            unreachable!()
2003        };
2004        //col1->bool_col:BOOLEAN UNCOMPRESSED DO:0 FPO:37329 SZ:3022/3022/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: false, max: true, num_nulls: 0]
2005        assert!(&column_index[0][1].is_sorted());
2006        if let Index::BOOLEAN(index) = &column_index[0][1] {
2007            assert_eq!(index.indexes.len(), 82);
2008            assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
2009        } else {
2010            unreachable!()
2011        };
2012        //col2->tinyint_col: INT32 UNCOMPRESSED DO:0 FPO:40351 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2013        assert!(&column_index[0][2].is_sorted());
2014        if let Index::INT32(index) = &column_index[0][2] {
2015            check_native_page_index(
2016                index,
2017                325,
2018                get_row_group_min_max_bytes(row_group_metadata, 2),
2019                BoundaryOrder::ASCENDING,
2020            );
2021            assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
2022        } else {
2023            unreachable!()
2024        };
2025        //col4->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2026        assert!(&column_index[0][3].is_sorted());
2027        if let Index::INT32(index) = &column_index[0][3] {
2028            check_native_page_index(
2029                index,
2030                325,
2031                get_row_group_min_max_bytes(row_group_metadata, 3),
2032                BoundaryOrder::ASCENDING,
2033            );
2034            assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
2035        } else {
2036            unreachable!()
2037        };
2038        //col5->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2039        assert!(&column_index[0][4].is_sorted());
2040        if let Index::INT32(index) = &column_index[0][4] {
2041            check_native_page_index(
2042                index,
2043                325,
2044                get_row_group_min_max_bytes(row_group_metadata, 4),
2045                BoundaryOrder::ASCENDING,
2046            );
2047            assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
2048        } else {
2049            unreachable!()
2050        };
2051        //col6->bigint_col: INT64 UNCOMPRESSED DO:0 FPO:152326 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 90, num_nulls: 0]
2052        assert!(!&column_index[0][5].is_sorted());
2053        if let Index::INT64(index) = &column_index[0][5] {
2054            check_native_page_index(
2055                index,
2056                528,
2057                get_row_group_min_max_bytes(row_group_metadata, 5),
2058                BoundaryOrder::UNORDERED,
2059            );
2060            assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
2061        } else {
2062            unreachable!()
2063        };
2064        //col7->float_col: FLOAT UNCOMPRESSED DO:0 FPO:223924 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 9.9, num_nulls: 0]
2065        assert!(&column_index[0][6].is_sorted());
2066        if let Index::FLOAT(index) = &column_index[0][6] {
2067            check_native_page_index(
2068                index,
2069                325,
2070                get_row_group_min_max_bytes(row_group_metadata, 6),
2071                BoundaryOrder::ASCENDING,
2072            );
2073            assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
2074        } else {
2075            unreachable!()
2076        };
2077        //col8->double_col: DOUBLE UNCOMPRESSED DO:0 FPO:261249 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 90.89999999999999, num_nulls: 0]
2078        assert!(!&column_index[0][7].is_sorted());
2079        if let Index::DOUBLE(index) = &column_index[0][7] {
2080            check_native_page_index(
2081                index,
2082                528,
2083                get_row_group_min_max_bytes(row_group_metadata, 7),
2084                BoundaryOrder::UNORDERED,
2085            );
2086            assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
2087        } else {
2088            unreachable!()
2089        };
2090        //col9->date_string_col: BINARY UNCOMPRESSED DO:0 FPO:332847 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 01/01/09, max: 12/31/10, num_nulls: 0]
2091        assert!(!&column_index[0][8].is_sorted());
2092        if let Index::BYTE_ARRAY(index) = &column_index[0][8] {
2093            check_native_page_index(
2094                index,
2095                974,
2096                get_row_group_min_max_bytes(row_group_metadata, 8),
2097                BoundaryOrder::UNORDERED,
2098            );
2099            assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
2100        } else {
2101            unreachable!()
2102        };
2103        //col10->string_col: BINARY UNCOMPRESSED DO:0 FPO:444795 SZ:45298/45298/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2104        assert!(&column_index[0][9].is_sorted());
2105        if let Index::BYTE_ARRAY(index) = &column_index[0][9] {
2106            check_native_page_index(
2107                index,
2108                352,
2109                get_row_group_min_max_bytes(row_group_metadata, 9),
2110                BoundaryOrder::ASCENDING,
2111            );
2112            assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
2113        } else {
2114            unreachable!()
2115        };
2116        //col11->timestamp_col: INT96 UNCOMPRESSED DO:0 FPO:490093 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[num_nulls: 0, min/max not defined]
2117        //Notice: min_max values for each page for this col not exits.
2118        assert!(!&column_index[0][10].is_sorted());
2119        if let Index::NONE = &column_index[0][10] {
2120            assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
2121        } else {
2122            unreachable!()
2123        };
2124        //col12->year: INT32 UNCOMPRESSED DO:0 FPO:602041 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2009, max: 2010, num_nulls: 0]
2125        assert!(&column_index[0][11].is_sorted());
2126        if let Index::INT32(index) = &column_index[0][11] {
2127            check_native_page_index(
2128                index,
2129                325,
2130                get_row_group_min_max_bytes(row_group_metadata, 11),
2131                BoundaryOrder::ASCENDING,
2132            );
2133            assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
2134        } else {
2135            unreachable!()
2136        };
2137        //col13->month: INT32 UNCOMPRESSED DO:0 FPO:639366 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 12, num_nulls: 0]
2138        assert!(!&column_index[0][12].is_sorted());
2139        if let Index::INT32(index) = &column_index[0][12] {
2140            check_native_page_index(
2141                index,
2142                325,
2143                get_row_group_min_max_bytes(row_group_metadata, 12),
2144                BoundaryOrder::UNORDERED,
2145            );
2146            assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
2147        } else {
2148            unreachable!()
2149        };
2150    }
2151
2152    fn check_native_page_index<T: ParquetValueType>(
2153        row_group_index: &NativeIndex<T>,
2154        page_size: usize,
2155        min_max: (&[u8], &[u8]),
2156        boundary_order: BoundaryOrder,
2157    ) {
2158        assert_eq!(row_group_index.indexes.len(), page_size);
2159        assert_eq!(row_group_index.boundary_order, boundary_order);
2160        row_group_index.indexes.iter().all(|x| {
2161            x.min.as_ref().unwrap() >= &T::try_from_le_slice(min_max.0).unwrap()
2162                && x.max.as_ref().unwrap() <= &T::try_from_le_slice(min_max.1).unwrap()
2163        });
2164    }
2165
2166    fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
2167        let statistics = r.column(col_num).statistics().unwrap();
2168        (
2169            statistics.min_bytes_opt().unwrap_or_default(),
2170            statistics.max_bytes_opt().unwrap_or_default(),
2171        )
2172    }
2173
2174    #[test]
2175    fn test_skip_next_page_with_dictionary_page() {
2176        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2177        let builder = ReadOptionsBuilder::new();
2178        // enable read page index
2179        let options = builder.with_page_index().build();
2180        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2181        let reader = reader_result.unwrap();
2182
2183        let row_group_reader = reader.get_row_group(0).unwrap();
2184
2185        // use 'string_col', Boundary order: UNORDERED, total 352 data pages and 1 dictionary page.
2186        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2187
2188        let mut vec = vec![];
2189
2190        // Step 1: Peek and ensure dictionary page is correctly identified
2191        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2192        assert!(meta.is_dict);
2193
2194        // Step 2: Call skip_next_page to skip the dictionary page
2195        column_page_reader.skip_next_page().unwrap();
2196
2197        // Step 3: Read the next data page after skipping the dictionary page
2198        let page = column_page_reader.get_next_page().unwrap().unwrap();
2199        assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2200
2201        // Step 4: Continue reading remaining data pages and verify correctness
2202        for _i in 0..351 {
2203            // 352 total pages, 1 dictionary page is skipped
2204            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2205            assert!(!meta.is_dict); // Verify no dictionary page here
2206            vec.push(meta);
2207
2208            let page = column_page_reader.get_next_page().unwrap().unwrap();
2209            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2210        }
2211
2212        // Step 5: Check if all pages are read
2213        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2214        assert!(column_page_reader.get_next_page().unwrap().is_none());
2215
2216        // Step 6: Verify the number of data pages read (should be 351 data pages)
2217        assert_eq!(vec.len(), 351);
2218    }
2219
2220    #[test]
2221    fn test_skip_page_with_offset_index() {
2222        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2223        let builder = ReadOptionsBuilder::new();
2224        //enable read page index
2225        let options = builder.with_page_index().build();
2226        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2227        let reader = reader_result.unwrap();
2228
2229        let row_group_reader = reader.get_row_group(0).unwrap();
2230
2231        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
2232        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2233
2234        let mut vec = vec![];
2235
2236        for i in 0..325 {
2237            if i % 2 == 0 {
2238                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2239            } else {
2240                column_page_reader.skip_next_page().unwrap();
2241            }
2242        }
2243        //check read all pages.
2244        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2245        assert!(column_page_reader.get_next_page().unwrap().is_none());
2246
2247        assert_eq!(vec.len(), 163);
2248    }
2249
2250    #[test]
2251    fn test_skip_page_without_offset_index() {
2252        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2253
2254        // use default SerializedFileReader without read offsetIndex
2255        let reader_result = SerializedFileReader::new(test_file);
2256        let reader = reader_result.unwrap();
2257
2258        let row_group_reader = reader.get_row_group(0).unwrap();
2259
2260        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
2261        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2262
2263        let mut vec = vec![];
2264
2265        for i in 0..325 {
2266            if i % 2 == 0 {
2267                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2268            } else {
2269                column_page_reader.peek_next_page().unwrap().unwrap();
2270                column_page_reader.skip_next_page().unwrap();
2271            }
2272        }
2273        //check read all pages.
2274        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2275        assert!(column_page_reader.get_next_page().unwrap().is_none());
2276
2277        assert_eq!(vec.len(), 163);
2278    }
2279
2280    #[test]
2281    fn test_peek_page_with_dictionary_page() {
2282        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2283        let builder = ReadOptionsBuilder::new();
2284        //enable read page index
2285        let options = builder.with_page_index().build();
2286        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2287        let reader = reader_result.unwrap();
2288        let row_group_reader = reader.get_row_group(0).unwrap();
2289
2290        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
2291        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2292
2293        let mut vec = vec![];
2294
2295        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2296        assert!(meta.is_dict);
2297        let page = column_page_reader.get_next_page().unwrap().unwrap();
2298        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2299
2300        for i in 0..352 {
2301            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2302            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
2303            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
2304            if i != 351 {
2305                assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
2306            } else {
2307                // last page first row index is 7290, total row count is 7300
2308                // because first row start with zero, last page row count should be 10.
2309                assert_eq!(meta.num_rows, Some(10));
2310            }
2311            assert!(!meta.is_dict);
2312            vec.push(meta);
2313            let page = column_page_reader.get_next_page().unwrap().unwrap();
2314            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2315        }
2316
2317        //check read all pages.
2318        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2319        assert!(column_page_reader.get_next_page().unwrap().is_none());
2320
2321        assert_eq!(vec.len(), 352);
2322    }
2323
2324    #[test]
2325    fn test_peek_page_with_dictionary_page_without_offset_index() {
2326        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2327
2328        let reader_result = SerializedFileReader::new(test_file);
2329        let reader = reader_result.unwrap();
2330        let row_group_reader = reader.get_row_group(0).unwrap();
2331
2332        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
2333        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2334
2335        let mut vec = vec![];
2336
2337        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2338        assert!(meta.is_dict);
2339        let page = column_page_reader.get_next_page().unwrap().unwrap();
2340        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2341
2342        for i in 0..352 {
2343            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2344            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
2345            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
2346            if i != 351 {
2347                assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2348            } else {
2349                // last page first row index is 7290, total row count is 7300
2350                // because first row start with zero, last page row count should be 10.
2351                assert_eq!(meta.num_levels, Some(10));
2352            }
2353            assert!(!meta.is_dict);
2354            vec.push(meta);
2355            let page = column_page_reader.get_next_page().unwrap().unwrap();
2356            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2357        }
2358
2359        //check read all pages.
2360        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2361        assert!(column_page_reader.get_next_page().unwrap().is_none());
2362
2363        assert_eq!(vec.len(), 352);
2364    }
2365
2366    #[test]
2367    fn test_fixed_length_index() {
2368        let message_type = "
2369        message test_schema {
2370          OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2371        }
2372        ";
2373
2374        let schema = parse_message_type(message_type).unwrap();
2375        let mut out = Vec::with_capacity(1024);
2376        let mut writer =
2377            SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2378
2379        let mut r = writer.next_row_group().unwrap();
2380        let mut c = r.next_column().unwrap().unwrap();
2381        c.typed::<FixedLenByteArrayType>()
2382            .write_batch(
2383                &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2384                Some(&[1, 1, 0, 1]),
2385                None,
2386            )
2387            .unwrap();
2388        c.close().unwrap();
2389        r.close().unwrap();
2390        writer.close().unwrap();
2391
2392        let b = Bytes::from(out);
2393        let options = ReadOptionsBuilder::new().with_page_index().build();
2394        let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2395        let index = reader.metadata().column_index().unwrap();
2396
2397        // 1 row group
2398        assert_eq!(index.len(), 1);
2399        let c = &index[0];
2400        // 1 column
2401        assert_eq!(c.len(), 1);
2402
2403        match &c[0] {
2404            Index::FIXED_LEN_BYTE_ARRAY(v) => {
2405                assert_eq!(v.indexes.len(), 1);
2406                let page_idx = &v.indexes[0];
2407                assert_eq!(page_idx.null_count.unwrap(), 1);
2408                assert_eq!(page_idx.min.as_ref().unwrap().as_ref(), &[0; 11]);
2409                assert_eq!(page_idx.max.as_ref().unwrap().as_ref(), &[5; 11]);
2410            }
2411            _ => unreachable!(),
2412        }
2413    }
2414
2415    #[test]
2416    fn test_multi_gz() {
2417        let file = get_test_file("concatenated_gzip_members.parquet");
2418        let reader = SerializedFileReader::new(file).unwrap();
2419        let row_group_reader = reader.get_row_group(0).unwrap();
2420        match row_group_reader.get_column_reader(0).unwrap() {
2421            ColumnReader::Int64ColumnReader(mut reader) => {
2422                let mut buffer = Vec::with_capacity(1024);
2423                let mut def_levels = Vec::with_capacity(1024);
2424                let (num_records, num_values, num_levels) = reader
2425                    .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2426                    .unwrap();
2427
2428                assert_eq!(num_records, 513);
2429                assert_eq!(num_values, 513);
2430                assert_eq!(num_levels, 513);
2431
2432                let expected: Vec<i64> = (1..514).collect();
2433                assert_eq!(&buffer, &expected);
2434            }
2435            _ => unreachable!(),
2436        }
2437    }
2438
2439    #[test]
2440    fn test_byte_stream_split_extended() {
2441        let path = format!(
2442            "{}/byte_stream_split_extended.gzip.parquet",
2443            arrow::util::test_util::parquet_test_data(),
2444        );
2445        let file = File::open(path).unwrap();
2446        let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2447
2448        // Use full schema as projected schema
2449        let mut iter = reader
2450            .get_row_iter(None)
2451            .expect("Failed to create row iterator");
2452
2453        let mut start = 0;
2454        let end = reader.metadata().file_metadata().num_rows();
2455
2456        let check_row = |row: Result<Row, ParquetError>| {
2457            assert!(row.is_ok());
2458            let r = row.unwrap();
2459            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2460            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2461            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2462            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2463            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2464            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2465            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2466        };
2467
2468        while start < end {
2469            match iter.next() {
2470                Some(row) => check_row(row),
2471                None => break,
2472            };
2473            start += 1;
2474        }
2475    }
2476
2477    #[test]
2478    fn test_filtered_rowgroup_metadata() {
2479        let message_type = "
2480            message test_schema {
2481                REQUIRED INT32 a;
2482            }
2483        ";
2484        let schema = Arc::new(parse_message_type(message_type).unwrap());
2485        let props = Arc::new(
2486            WriterProperties::builder()
2487                .set_statistics_enabled(EnabledStatistics::Page)
2488                .build(),
2489        );
2490        let mut file: File = tempfile::tempfile().unwrap();
2491        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2492        let data = [1, 2, 3, 4, 5];
2493
2494        // write 5 row groups
2495        for idx in 0..5 {
2496            let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2497            let mut row_group_writer = file_writer.next_row_group().unwrap();
2498            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2499                writer
2500                    .typed::<Int32Type>()
2501                    .write_batch(data_i.as_slice(), None, None)
2502                    .unwrap();
2503                writer.close().unwrap();
2504            }
2505            row_group_writer.close().unwrap();
2506            file_writer.flushed_row_groups();
2507        }
2508        let file_metadata = file_writer.close().unwrap();
2509
2510        assert_eq!(file_metadata.num_rows, 25);
2511        assert_eq!(file_metadata.row_groups.len(), 5);
2512
2513        // read only the 3rd row group
2514        let read_options = ReadOptionsBuilder::new()
2515            .with_page_index()
2516            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2517            .build();
2518        let reader =
2519            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2520                .unwrap();
2521        let metadata = reader.metadata();
2522
2523        // check we got the expected row group
2524        assert_eq!(metadata.num_row_groups(), 1);
2525        assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2526
2527        // check we only got the relevant page indexes
2528        assert!(metadata.column_index().is_some());
2529        assert!(metadata.offset_index().is_some());
2530        assert_eq!(metadata.column_index().unwrap().len(), 1);
2531        assert_eq!(metadata.offset_index().unwrap().len(), 1);
2532        let col_idx = metadata.column_index().unwrap();
2533        let off_idx = metadata.offset_index().unwrap();
2534        let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2535        let pg_idx = &col_idx[0][0];
2536        let off_idx_i = &off_idx[0][0];
2537
2538        // test that we got the index matching the row group
2539        match pg_idx {
2540            Index::INT32(int_idx) => {
2541                let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2542                let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2543                assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2544                assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2545            }
2546            _ => panic!("wrong stats type"),
2547        }
2548
2549        // check offset index matches too
2550        assert_eq!(
2551            off_idx_i.page_locations[0].offset,
2552            metadata.row_group(0).column(0).data_page_offset()
2553        );
2554
2555        // read non-contiguous row groups
2556        let read_options = ReadOptionsBuilder::new()
2557            .with_page_index()
2558            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2559            .build();
2560        let reader =
2561            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2562                .unwrap();
2563        let metadata = reader.metadata();
2564
2565        // check we got the expected row groups
2566        assert_eq!(metadata.num_row_groups(), 2);
2567        assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2568        assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2569
2570        // check we only got the relevant page indexes
2571        assert!(metadata.column_index().is_some());
2572        assert!(metadata.offset_index().is_some());
2573        assert_eq!(metadata.column_index().unwrap().len(), 2);
2574        assert_eq!(metadata.offset_index().unwrap().len(), 2);
2575        let col_idx = metadata.column_index().unwrap();
2576        let off_idx = metadata.offset_index().unwrap();
2577
2578        for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2579            let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2580            let pg_idx = &col_idx_i[0];
2581            let off_idx_i = &off_idx[i][0];
2582
2583            // test that we got the index matching the row group
2584            match pg_idx {
2585                Index::INT32(int_idx) => {
2586                    let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2587                    let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2588                    assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2589                    assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2590                }
2591                _ => panic!("wrong stats type"),
2592            }
2593
2594            // check offset index matches too
2595            assert_eq!(
2596                off_idx_i.page_locations[0].offset,
2597                metadata.row_group(i).column(0).data_page_offset()
2598            );
2599        }
2600    }
2601}