parquet/file/
serialized_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader
19//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM)
20
21use crate::basic::{Encoding, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{create_codec, Codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{read_and_decrypt, CryptoContext};
27use crate::errors::{ParquetError, Result};
28use crate::file::page_index::offset_index::OffsetIndexMetaData;
29use crate::file::{
30    metadata::*,
31    properties::{ReaderProperties, ReaderPropertiesPtr},
32    reader::*,
33    statistics,
34};
35use crate::format::{PageHeader, PageLocation, PageType};
36use crate::record::reader::RowIter;
37use crate::record::Row;
38use crate::schema::types::Type as SchemaType;
39use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
40use bytes::Bytes;
41use std::collections::VecDeque;
42use std::iter;
43use std::{fs::File, io::Read, path::Path, sync::Arc};
44use thrift::protocol::TCompactInputProtocol;
45
46impl TryFrom<File> for SerializedFileReader<File> {
47    type Error = ParquetError;
48
49    fn try_from(file: File) -> Result<Self> {
50        Self::new(file)
51    }
52}
53
54impl TryFrom<&Path> for SerializedFileReader<File> {
55    type Error = ParquetError;
56
57    fn try_from(path: &Path) -> Result<Self> {
58        let file = File::open(path)?;
59        Self::try_from(file)
60    }
61}
62
63impl TryFrom<String> for SerializedFileReader<File> {
64    type Error = ParquetError;
65
66    fn try_from(path: String) -> Result<Self> {
67        Self::try_from(Path::new(&path))
68    }
69}
70
71impl TryFrom<&str> for SerializedFileReader<File> {
72    type Error = ParquetError;
73
74    fn try_from(path: &str) -> Result<Self> {
75        Self::try_from(Path::new(&path))
76    }
77}
78
79/// Conversion into a [`RowIter`]
80/// using the full file schema over all row groups.
81impl IntoIterator for SerializedFileReader<File> {
82    type Item = Result<Row>;
83    type IntoIter = RowIter<'static>;
84
85    fn into_iter(self) -> Self::IntoIter {
86        RowIter::from_file_into(Box::new(self))
87    }
88}
89
90// ----------------------------------------------------------------------
91// Implementations of file & row group readers
92
93/// A serialized implementation for Parquet [`FileReader`].
94pub struct SerializedFileReader<R: ChunkReader> {
95    chunk_reader: Arc<R>,
96    metadata: Arc<ParquetMetaData>,
97    props: ReaderPropertiesPtr,
98}
99
100/// A predicate for filtering row groups, invoked with the metadata and index
101/// of each row group in the file. Only row groups for which the predicate
102/// evaluates to `true` will be scanned
103pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
104
105/// A builder for [`ReadOptions`].
106/// For the predicates that are added to the builder,
107/// they will be chained using 'AND' to filter the row groups.
108#[derive(Default)]
109pub struct ReadOptionsBuilder {
110    predicates: Vec<ReadGroupPredicate>,
111    enable_page_index: bool,
112    props: Option<ReaderProperties>,
113}
114
115impl ReadOptionsBuilder {
116    /// New builder
117    pub fn new() -> Self {
118        Self::default()
119    }
120
121    /// Add a predicate on row group metadata to the reading option,
122    /// Filter only row groups that match the predicate criteria
123    pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
124        self.predicates.push(predicate);
125        self
126    }
127
128    /// Add a range predicate on filtering row groups if their midpoints are within
129    /// the Closed-Open range `[start..end) {x | start <= x < end}`
130    pub fn with_range(mut self, start: i64, end: i64) -> Self {
131        assert!(start < end);
132        let predicate = move |rg: &RowGroupMetaData, _: usize| {
133            let mid = get_midpoint_offset(rg);
134            mid >= start && mid < end
135        };
136        self.predicates.push(Box::new(predicate));
137        self
138    }
139
140    /// Enable reading the page index structures described in
141    /// "[Column Index] Layout to Support Page Skipping"
142    ///
143    /// [Column Index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
144    pub fn with_page_index(mut self) -> Self {
145        self.enable_page_index = true;
146        self
147    }
148
149    /// Set the [`ReaderProperties`] configuration.
150    pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
151        self.props = Some(properties);
152        self
153    }
154
155    /// Seal the builder and return the read options
156    pub fn build(self) -> ReadOptions {
157        let props = self
158            .props
159            .unwrap_or_else(|| ReaderProperties::builder().build());
160        ReadOptions {
161            predicates: self.predicates,
162            enable_page_index: self.enable_page_index,
163            props,
164        }
165    }
166}
167
168/// A collection of options for reading a Parquet file.
169///
170/// Currently, only predicates on row group metadata are supported.
171/// All predicates will be chained using 'AND' to filter the row groups.
172pub struct ReadOptions {
173    predicates: Vec<ReadGroupPredicate>,
174    enable_page_index: bool,
175    props: ReaderProperties,
176}
177
178impl<R: 'static + ChunkReader> SerializedFileReader<R> {
179    /// Creates file reader from a Parquet file.
180    /// Returns error if Parquet file does not exist or is corrupt.
181    pub fn new(chunk_reader: R) -> Result<Self> {
182        let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
183        let props = Arc::new(ReaderProperties::builder().build());
184        Ok(Self {
185            chunk_reader: Arc::new(chunk_reader),
186            metadata: Arc::new(metadata),
187            props,
188        })
189    }
190
191    /// Creates file reader from a Parquet file with read options.
192    /// Returns error if Parquet file does not exist or is corrupt.
193    pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
194        let mut metadata_builder = ParquetMetaDataReader::new()
195            .parse_and_finish(&chunk_reader)?
196            .into_builder();
197        let mut predicates = options.predicates;
198
199        // Filter row groups based on the predicates
200        for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
201            let mut keep = true;
202            for predicate in &mut predicates {
203                if !predicate(&rg_meta, i) {
204                    keep = false;
205                    break;
206                }
207            }
208            if keep {
209                metadata_builder = metadata_builder.add_row_group(rg_meta);
210            }
211        }
212
213        let mut metadata = metadata_builder.build();
214
215        // If page indexes are desired, build them with the filtered set of row groups
216        if options.enable_page_index {
217            let mut reader =
218                ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
219            reader.read_page_indexes(&chunk_reader)?;
220            metadata = reader.finish()?;
221        }
222
223        Ok(Self {
224            chunk_reader: Arc::new(chunk_reader),
225            metadata: Arc::new(metadata),
226            props: Arc::new(options.props),
227        })
228    }
229}
230
231/// Get midpoint offset for a row group
232fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
233    let col = meta.column(0);
234    let mut offset = col.data_page_offset();
235    if let Some(dic_offset) = col.dictionary_page_offset() {
236        if offset > dic_offset {
237            offset = dic_offset
238        }
239    };
240    offset + meta.compressed_size() / 2
241}
242
243impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
244    fn metadata(&self) -> &ParquetMetaData {
245        &self.metadata
246    }
247
248    fn num_row_groups(&self) -> usize {
249        self.metadata.num_row_groups()
250    }
251
252    fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
253        let row_group_metadata = self.metadata.row_group(i);
254        // Row groups should be processed sequentially.
255        let props = Arc::clone(&self.props);
256        let f = Arc::clone(&self.chunk_reader);
257        Ok(Box::new(SerializedRowGroupReader::new(
258            f,
259            row_group_metadata,
260            self.metadata.offset_index().map(|x| x[i].as_slice()),
261            props,
262        )?))
263    }
264
265    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
266        RowIter::from_file(projection, self)
267    }
268}
269
270/// A serialized implementation for Parquet [`RowGroupReader`].
271pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
272    chunk_reader: Arc<R>,
273    metadata: &'a RowGroupMetaData,
274    offset_index: Option<&'a [OffsetIndexMetaData]>,
275    props: ReaderPropertiesPtr,
276    bloom_filters: Vec<Option<Sbbf>>,
277}
278
279impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
280    /// Creates new row group reader from a file, row group metadata and custom config.
281    pub fn new(
282        chunk_reader: Arc<R>,
283        metadata: &'a RowGroupMetaData,
284        offset_index: Option<&'a [OffsetIndexMetaData]>,
285        props: ReaderPropertiesPtr,
286    ) -> Result<Self> {
287        let bloom_filters = if props.read_bloom_filter() {
288            metadata
289                .columns()
290                .iter()
291                .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
292                .collect::<Result<Vec<_>>>()?
293        } else {
294            iter::repeat(None).take(metadata.columns().len()).collect()
295        };
296        Ok(Self {
297            chunk_reader,
298            metadata,
299            offset_index,
300            props,
301            bloom_filters,
302        })
303    }
304}
305
306impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
307    fn metadata(&self) -> &RowGroupMetaData {
308        self.metadata
309    }
310
311    fn num_columns(&self) -> usize {
312        self.metadata.num_columns()
313    }
314
315    // TODO: fix PARQUET-816
316    fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
317        let col = self.metadata.column(i);
318
319        let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
320
321        let props = Arc::clone(&self.props);
322        Ok(Box::new(SerializedPageReader::new_with_properties(
323            Arc::clone(&self.chunk_reader),
324            col,
325            usize::try_from(self.metadata.num_rows())?,
326            page_locations,
327            props,
328        )?))
329    }
330
331    /// get bloom filter for the `i`th column
332    fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
333        self.bloom_filters[i].as_ref()
334    }
335
336    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
337        RowIter::from_row_group(projection, self)
338    }
339}
340
341/// Reads a [`PageHeader`] from the provided [`Read`]
342pub(crate) fn read_page_header<T: Read>(input: &mut T) -> Result<PageHeader> {
343    let mut prot = TCompactInputProtocol::new(input);
344    Ok(PageHeader::read_from_in_protocol(&mut prot)?)
345}
346
347#[cfg(feature = "encryption")]
348pub(crate) fn read_encrypted_page_header<T: Read>(
349    input: &mut T,
350    crypto_context: Arc<CryptoContext>,
351) -> Result<PageHeader> {
352    let data_decryptor = crypto_context.data_decryptor();
353    let aad = crypto_context.create_page_header_aad()?;
354
355    let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
356        ParquetError::General(format!(
357            "Error decrypting column {}, decryptor may be wrong or missing",
358            crypto_context.column_ordinal
359        ))
360    })?;
361
362    let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
363    Ok(PageHeader::read_from_in_protocol(&mut prot)?)
364}
365
366/// Reads a [`PageHeader`] from the provided [`Read`] returning the number of bytes read.
367/// If the page header is encrypted [`CryptoContext`] must be provided.
368#[cfg(feature = "encryption")]
369fn read_encrypted_page_header_len<T: Read>(
370    input: &mut T,
371    crypto_context: Option<Arc<CryptoContext>>,
372) -> Result<(usize, PageHeader)> {
373    /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
374    struct TrackedRead<R> {
375        inner: R,
376        bytes_read: usize,
377    }
378
379    impl<R: Read> Read for TrackedRead<R> {
380        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
381            let v = self.inner.read(buf)?;
382            self.bytes_read += v;
383            Ok(v)
384        }
385    }
386
387    let mut tracked = TrackedRead {
388        inner: input,
389        bytes_read: 0,
390    };
391    let header = read_encrypted_page_header(&mut tracked, crypto_context.unwrap())?;
392    Ok((tracked.bytes_read, header))
393}
394
395/// Reads a [`PageHeader`] from the provided [`Read`] returning the number of bytes read.
396fn read_page_header_len<T: Read>(input: &mut T) -> Result<(usize, PageHeader)> {
397    /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
398    struct TrackedRead<R> {
399        inner: R,
400        bytes_read: usize,
401    }
402
403    impl<R: Read> Read for TrackedRead<R> {
404        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
405            let v = self.inner.read(buf)?;
406            self.bytes_read += v;
407            Ok(v)
408        }
409    }
410
411    let mut tracked = TrackedRead {
412        inner: input,
413        bytes_read: 0,
414    };
415    let header = read_page_header(&mut tracked)?;
416    Ok((tracked.bytes_read, header))
417}
418
419/// Decodes a [`Page`] from the provided `buffer`
420pub(crate) fn decode_page(
421    page_header: PageHeader,
422    buffer: Bytes,
423    physical_type: Type,
424    decompressor: Option<&mut Box<dyn Codec>>,
425) -> Result<Page> {
426    // Verify the 32-bit CRC checksum of the page
427    #[cfg(feature = "crc")]
428    if let Some(expected_crc) = page_header.crc {
429        let crc = crc32fast::hash(&buffer);
430        if crc != expected_crc as u32 {
431            return Err(general_err!("Page CRC checksum mismatch"));
432        }
433    }
434
435    // When processing data page v2, depending on enabled compression for the
436    // page, we should account for uncompressed data ('offset') of
437    // repetition and definition levels.
438    //
439    // We always use 0 offset for other pages other than v2, `true` flag means
440    // that compression will be applied if decompressor is defined
441    let mut offset: usize = 0;
442    let mut can_decompress = true;
443
444    if let Some(ref header_v2) = page_header.data_page_header_v2 {
445        if header_v2.definition_levels_byte_length < 0
446            || header_v2.repetition_levels_byte_length < 0
447            || header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
448                > page_header.uncompressed_page_size
449        {
450            return Err(general_err!(
451                "DataPage v2 header contains implausible values \
452                    for definition_levels_byte_length ({}) \
453                    and repetition_levels_byte_length ({}) \
454                    given DataPage header provides uncompressed_page_size ({})",
455                header_v2.definition_levels_byte_length,
456                header_v2.repetition_levels_byte_length,
457                page_header.uncompressed_page_size
458            ));
459        }
460        offset = usize::try_from(
461            header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
462        )?;
463        // When is_compressed flag is missing the page is considered compressed
464        can_decompress = header_v2.is_compressed.unwrap_or(true);
465    }
466
467    // TODO: page header could be huge because of statistics. We should set a
468    // maximum page header size and abort if that is exceeded.
469    let buffer = match decompressor {
470        Some(decompressor) if can_decompress => {
471            let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
472            let decompressed_size = uncompressed_page_size - offset;
473            let mut decompressed = Vec::with_capacity(uncompressed_page_size);
474            decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
475            if decompressed_size > 0 {
476                let compressed = &buffer.as_ref()[offset..];
477                decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
478            }
479
480            if decompressed.len() != uncompressed_page_size {
481                return Err(general_err!(
482                    "Actual decompressed size doesn't match the expected one ({} vs {})",
483                    decompressed.len(),
484                    uncompressed_page_size
485                ));
486            }
487
488            Bytes::from(decompressed)
489        }
490        _ => buffer,
491    };
492
493    let result = match page_header.type_ {
494        PageType::DICTIONARY_PAGE => {
495            let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
496                ParquetError::General("Missing dictionary page header".to_string())
497            })?;
498            let is_sorted = dict_header.is_sorted.unwrap_or(false);
499            Page::DictionaryPage {
500                buf: buffer,
501                num_values: dict_header.num_values.try_into()?,
502                encoding: Encoding::try_from(dict_header.encoding)?,
503                is_sorted,
504            }
505        }
506        PageType::DATA_PAGE => {
507            let header = page_header
508                .data_page_header
509                .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
510            Page::DataPage {
511                buf: buffer,
512                num_values: header.num_values.try_into()?,
513                encoding: Encoding::try_from(header.encoding)?,
514                def_level_encoding: Encoding::try_from(header.definition_level_encoding)?,
515                rep_level_encoding: Encoding::try_from(header.repetition_level_encoding)?,
516                statistics: statistics::from_thrift(physical_type, header.statistics)?,
517            }
518        }
519        PageType::DATA_PAGE_V2 => {
520            let header = page_header
521                .data_page_header_v2
522                .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
523            let is_compressed = header.is_compressed.unwrap_or(true);
524            Page::DataPageV2 {
525                buf: buffer,
526                num_values: header.num_values.try_into()?,
527                encoding: Encoding::try_from(header.encoding)?,
528                num_nulls: header.num_nulls.try_into()?,
529                num_rows: header.num_rows.try_into()?,
530                def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
531                rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
532                is_compressed,
533                statistics: statistics::from_thrift(physical_type, header.statistics)?,
534            }
535        }
536        _ => {
537            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
538            unimplemented!("Page type {:?} is not supported", page_header.type_)
539        }
540    };
541
542    Ok(result)
543}
544
545enum SerializedPageReaderState {
546    Values {
547        /// The current byte offset in the reader
548        offset: usize,
549
550        /// The length of the chunk in bytes
551        remaining_bytes: usize,
552
553        // If the next page header has already been "peeked", we will cache it and it`s length here
554        next_page_header: Option<Box<PageHeader>>,
555
556        /// The index of the data page within this column chunk
557        page_ordinal: usize,
558
559        /// Whether the next page is expected to be a dictionary page
560        require_dictionary: bool,
561    },
562    Pages {
563        /// Remaining page locations
564        page_locations: VecDeque<PageLocation>,
565        /// Remaining dictionary location if any
566        dictionary_page: Option<PageLocation>,
567        /// The total number of rows in this column chunk
568        total_rows: usize,
569    },
570}
571
572/// A serialized implementation for Parquet [`PageReader`].
573pub struct SerializedPageReader<R: ChunkReader> {
574    /// The chunk reader
575    reader: Arc<R>,
576
577    /// The compression codec for this column chunk. Only set for non-PLAIN codec.
578    decompressor: Option<Box<dyn Codec>>,
579
580    /// Column chunk type.
581    physical_type: Type,
582
583    state: SerializedPageReaderState,
584
585    /// Crypto context carrying objects required for decryption
586    #[cfg(feature = "encryption")]
587    crypto_context: Option<Arc<CryptoContext>>,
588}
589
590impl<R: ChunkReader> SerializedPageReader<R> {
591    /// Creates a new serialized page reader from a chunk reader and metadata
592    pub fn new(
593        reader: Arc<R>,
594        column_chunk_metadata: &ColumnChunkMetaData,
595        total_rows: usize,
596        page_locations: Option<Vec<PageLocation>>,
597    ) -> Result<Self> {
598        let props = Arc::new(ReaderProperties::builder().build());
599        SerializedPageReader::new_with_properties(
600            reader,
601            column_chunk_metadata,
602            total_rows,
603            page_locations,
604            props,
605        )
606    }
607
608    /// Stub No-op implementation when encryption is disabled.
609    #[cfg(all(feature = "arrow", not(feature = "encryption")))]
610    pub(crate) fn add_crypto_context(
611        self,
612        _rg_idx: usize,
613        _column_idx: usize,
614        _parquet_meta_data: &ParquetMetaData,
615        _column_chunk_metadata: &ColumnChunkMetaData,
616    ) -> Result<SerializedPageReader<R>> {
617        Ok(self)
618    }
619
620    /// Adds any necessary crypto context to this page reader, if encryption is enabled.
621    #[cfg(feature = "encryption")]
622    pub(crate) fn add_crypto_context(
623        mut self,
624        rg_idx: usize,
625        column_idx: usize,
626        parquet_meta_data: &ParquetMetaData,
627        column_chunk_metadata: &ColumnChunkMetaData,
628    ) -> Result<SerializedPageReader<R>> {
629        let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
630            return Ok(self);
631        };
632        let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
633            return Ok(self);
634        };
635        let crypto_context =
636            CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
637        self.crypto_context = Some(Arc::new(crypto_context));
638        Ok(self)
639    }
640
641    /// Creates a new serialized page with custom options.
642    pub fn new_with_properties(
643        reader: Arc<R>,
644        meta: &ColumnChunkMetaData,
645        total_rows: usize,
646        page_locations: Option<Vec<PageLocation>>,
647        props: ReaderPropertiesPtr,
648    ) -> Result<Self> {
649        let decompressor = create_codec(meta.compression(), props.codec_options())?;
650        let (start, len) = meta.byte_range();
651
652        let state = match page_locations {
653            Some(locations) => {
654                let dictionary_page = match locations.first() {
655                    Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
656                        offset: start as i64,
657                        compressed_page_size: (dict_offset.offset as u64 - start) as i32,
658                        first_row_index: 0,
659                    }),
660                    _ => None,
661                };
662
663                SerializedPageReaderState::Pages {
664                    page_locations: locations.into(),
665                    dictionary_page,
666                    total_rows,
667                }
668            }
669            None => SerializedPageReaderState::Values {
670                offset: usize::try_from(start)?,
671                remaining_bytes: usize::try_from(len)?,
672                next_page_header: None,
673                page_ordinal: 0,
674                require_dictionary: meta.dictionary_page_offset().is_some(),
675            },
676        };
677        Ok(Self {
678            reader,
679            decompressor,
680            state,
681            physical_type: meta.column_type(),
682            #[cfg(feature = "encryption")]
683            crypto_context: None,
684        })
685    }
686
687    /// Similar to `peek_next_page`, but returns the offset of the next page instead of the page metadata.
688    /// Unlike page metadata, an offset can uniquely identify a page.
689    ///
690    /// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice.
691    /// This function allows us to check if the next page is being cached or read previously.
692    #[cfg(test)]
693    fn peek_next_page_offset(&mut self) -> Result<Option<usize>> {
694        match &mut self.state {
695            SerializedPageReaderState::Values {
696                offset,
697                remaining_bytes,
698                next_page_header,
699                ..
700            } => {
701                loop {
702                    if *remaining_bytes == 0 {
703                        return Ok(None);
704                    }
705                    return if let Some(header) = next_page_header.as_ref() {
706                        if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
707                            Ok(Some(*offset))
708                        } else {
709                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
710                            *next_page_header = None;
711                            continue;
712                        }
713                    } else {
714                        let mut read = self.reader.get_read(*offset as u64)?;
715                        let (header_len, header) = read_page_header_len(&mut read)?;
716                        *offset += header_len;
717                        *remaining_bytes -= header_len;
718                        let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
719                            Ok(Some(*offset))
720                        } else {
721                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
722                            continue;
723                        };
724                        *next_page_header = Some(Box::new(header));
725                        page_meta
726                    };
727                }
728            }
729            SerializedPageReaderState::Pages {
730                page_locations,
731                dictionary_page,
732                ..
733            } => {
734                if let Some(page) = dictionary_page {
735                    Ok(Some(usize::try_from(page.offset)?))
736                } else if let Some(page) = page_locations.front() {
737                    Ok(Some(usize::try_from(page.offset)?))
738                } else {
739                    Ok(None)
740                }
741            }
742        }
743    }
744}
745
746impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
747    type Item = Result<Page>;
748
749    fn next(&mut self) -> Option<Self::Item> {
750        self.get_next_page().transpose()
751    }
752}
753
754fn verify_page_header_len(header_len: usize, remaining_bytes: usize) -> Result<()> {
755    if header_len > remaining_bytes {
756        return Err(eof_err!("Invalid page header"));
757    }
758    Ok(())
759}
760
761fn verify_page_size(
762    compressed_size: i32,
763    uncompressed_size: i32,
764    remaining_bytes: usize,
765) -> Result<()> {
766    // The page's compressed size should not exceed the remaining bytes that are
767    // available to read. The page's uncompressed size is the expected size
768    // after decompression, which can never be negative.
769    if compressed_size < 0 || compressed_size as usize > remaining_bytes || uncompressed_size < 0 {
770        return Err(eof_err!("Invalid page header"));
771    }
772    Ok(())
773}
774
775impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
776    fn get_next_page(&mut self) -> Result<Option<Page>> {
777        loop {
778            let page = match &mut self.state {
779                SerializedPageReaderState::Values {
780                    offset,
781                    remaining_bytes: remaining,
782                    next_page_header,
783                    page_ordinal,
784                    require_dictionary,
785                } => {
786                    if *remaining == 0 {
787                        return Ok(None);
788                    }
789
790                    let mut read = self.reader.get_read(*offset as u64)?;
791                    let header = if let Some(header) = next_page_header.take() {
792                        *header
793                    } else {
794                        #[cfg(feature = "encryption")]
795                        let (header_len, header) = if self.crypto_context.is_some() {
796                            let crypto_context = page_crypto_context(
797                                &self.crypto_context,
798                                *page_ordinal,
799                                *require_dictionary,
800                            )?;
801                            read_encrypted_page_header_len(&mut read, crypto_context)?
802                        } else {
803                            read_page_header_len(&mut read)?
804                        };
805
806                        #[cfg(not(feature = "encryption"))]
807                        let (header_len, header) = read_page_header_len(&mut read)?;
808
809                        verify_page_header_len(header_len, *remaining)?;
810                        *offset += header_len;
811                        *remaining -= header_len;
812                        header
813                    };
814                    verify_page_size(
815                        header.compressed_page_size,
816                        header.uncompressed_page_size,
817                        *remaining,
818                    )?;
819                    let data_len = header.compressed_page_size as usize;
820                    *offset += data_len;
821                    *remaining -= data_len;
822
823                    if header.type_ == PageType::INDEX_PAGE {
824                        continue;
825                    }
826
827                    let mut buffer = Vec::with_capacity(data_len);
828                    let read = read.take(data_len as u64).read_to_end(&mut buffer)?;
829
830                    if read != data_len {
831                        return Err(eof_err!(
832                            "Expected to read {} bytes of page, read only {}",
833                            data_len,
834                            read
835                        ));
836                    }
837
838                    #[cfg(feature = "encryption")]
839                    let crypto_context = page_crypto_context(
840                        &self.crypto_context,
841                        *page_ordinal,
842                        *require_dictionary,
843                    )?;
844                    #[cfg(feature = "encryption")]
845                    let buffer: Vec<u8> = if let Some(crypto_context) = crypto_context {
846                        let decryptor = crypto_context.data_decryptor();
847                        let aad = crypto_context.create_page_aad()?;
848                        decryptor.decrypt(buffer.as_ref(), &aad)?
849                    } else {
850                        buffer
851                    };
852
853                    let page = decode_page(
854                        header,
855                        Bytes::from(buffer),
856                        self.physical_type,
857                        self.decompressor.as_mut(),
858                    )?;
859                    if page.is_data_page() {
860                        *page_ordinal += 1;
861                    } else if page.is_dictionary_page() {
862                        *require_dictionary = false;
863                    }
864                    page
865                }
866                SerializedPageReaderState::Pages {
867                    page_locations,
868                    dictionary_page,
869                    ..
870                } => {
871                    let front = match dictionary_page
872                        .take()
873                        .or_else(|| page_locations.pop_front())
874                    {
875                        Some(front) => front,
876                        None => return Ok(None),
877                    };
878
879                    let page_len = usize::try_from(front.compressed_page_size)?;
880
881                    let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
882
883                    let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
884                    let header = PageHeader::read_from_in_protocol(&mut prot)?;
885                    let offset = buffer.len() - prot.as_slice().len();
886
887                    let bytes = buffer.slice(offset..);
888                    decode_page(
889                        header,
890                        bytes,
891                        self.physical_type,
892                        self.decompressor.as_mut(),
893                    )?
894                }
895            };
896
897            return Ok(Some(page));
898        }
899    }
900
901    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
902        match &mut self.state {
903            SerializedPageReaderState::Values {
904                offset,
905                remaining_bytes,
906                next_page_header,
907                ..
908            } => {
909                loop {
910                    if *remaining_bytes == 0 {
911                        return Ok(None);
912                    }
913                    return if let Some(header) = next_page_header.as_ref() {
914                        if let Ok(page_meta) = (&**header).try_into() {
915                            Ok(Some(page_meta))
916                        } else {
917                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
918                            *next_page_header = None;
919                            continue;
920                        }
921                    } else {
922                        let mut read = self.reader.get_read(*offset as u64)?;
923                        let (header_len, header) = read_page_header_len(&mut read)?;
924                        verify_page_header_len(header_len, *remaining_bytes)?;
925                        *offset += header_len;
926                        *remaining_bytes -= header_len;
927                        let page_meta = if let Ok(page_meta) = (&header).try_into() {
928                            Ok(Some(page_meta))
929                        } else {
930                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
931                            continue;
932                        };
933                        *next_page_header = Some(Box::new(header));
934                        page_meta
935                    };
936                }
937            }
938            SerializedPageReaderState::Pages {
939                page_locations,
940                dictionary_page,
941                total_rows,
942            } => {
943                if dictionary_page.is_some() {
944                    Ok(Some(PageMetadata {
945                        num_rows: None,
946                        num_levels: None,
947                        is_dict: true,
948                    }))
949                } else if let Some(page) = page_locations.front() {
950                    let next_rows = page_locations
951                        .get(1)
952                        .map(|x| x.first_row_index as usize)
953                        .unwrap_or(*total_rows);
954
955                    Ok(Some(PageMetadata {
956                        num_rows: Some(next_rows - page.first_row_index as usize),
957                        num_levels: None,
958                        is_dict: false,
959                    }))
960                } else {
961                    Ok(None)
962                }
963            }
964        }
965    }
966
967    fn skip_next_page(&mut self) -> Result<()> {
968        match &mut self.state {
969            SerializedPageReaderState::Values {
970                offset,
971                remaining_bytes,
972                next_page_header,
973                ..
974            } => {
975                if let Some(buffered_header) = next_page_header.take() {
976                    verify_page_size(
977                        buffered_header.compressed_page_size,
978                        buffered_header.uncompressed_page_size,
979                        *remaining_bytes,
980                    )?;
981                    // The next page header has already been peeked, so just advance the offset
982                    *offset += buffered_header.compressed_page_size as usize;
983                    *remaining_bytes -= buffered_header.compressed_page_size as usize;
984                } else {
985                    let mut read = self.reader.get_read(*offset as u64)?;
986                    let (header_len, header) = read_page_header_len(&mut read)?;
987                    verify_page_header_len(header_len, *remaining_bytes)?;
988                    verify_page_size(
989                        header.compressed_page_size,
990                        header.uncompressed_page_size,
991                        *remaining_bytes,
992                    )?;
993                    let data_page_size = header.compressed_page_size as usize;
994                    *offset += header_len + data_page_size;
995                    *remaining_bytes -= header_len + data_page_size;
996                }
997                Ok(())
998            }
999            SerializedPageReaderState::Pages {
1000                page_locations,
1001                dictionary_page,
1002                ..
1003            } => {
1004                if dictionary_page.is_some() {
1005                    // If a dictionary page exists, consume it by taking it (sets to None)
1006                    dictionary_page.take();
1007                } else {
1008                    // If no dictionary page exists, simply pop the data page from page_locations
1009                    page_locations.pop_front();
1010                }
1011
1012                Ok(())
1013            }
1014        }
1015    }
1016
1017    fn at_record_boundary(&mut self) -> Result<bool> {
1018        match &mut self.state {
1019            SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
1020            SerializedPageReaderState::Pages { .. } => Ok(true),
1021        }
1022    }
1023}
1024
1025#[cfg(feature = "encryption")]
1026fn page_crypto_context(
1027    crypto_context: &Option<Arc<CryptoContext>>,
1028    page_ordinal: usize,
1029    dictionary_page: bool,
1030) -> Result<Option<Arc<CryptoContext>>> {
1031    Ok(crypto_context.as_ref().map(|c| {
1032        Arc::new(if dictionary_page {
1033            c.for_dictionary_page()
1034        } else {
1035            c.with_page_ordinal(page_ordinal)
1036        })
1037    }))
1038}
1039
1040#[cfg(test)]
1041mod tests {
1042    use std::collections::HashSet;
1043
1044    use bytes::Buf;
1045
1046    use crate::file::properties::{EnabledStatistics, WriterProperties};
1047    use crate::format::BoundaryOrder;
1048
1049    use crate::basic::{self, ColumnOrder, SortOrder};
1050    use crate::column::reader::ColumnReader;
1051    use crate::data_type::private::ParquetValueType;
1052    use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1053    use crate::file::page_index::index::{Index, NativeIndex};
1054    use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
1055    use crate::file::writer::SerializedFileWriter;
1056    use crate::record::RowAccessor;
1057    use crate::schema::parser::parse_message_type;
1058    use crate::util::test_common::file_util::{get_test_file, get_test_path};
1059
1060    use super::*;
1061
1062    #[test]
1063    fn test_cursor_and_file_has_the_same_behaviour() {
1064        let mut buf: Vec<u8> = Vec::new();
1065        get_test_file("alltypes_plain.parquet")
1066            .read_to_end(&mut buf)
1067            .unwrap();
1068        let cursor = Bytes::from(buf);
1069        let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1070
1071        let test_file = get_test_file("alltypes_plain.parquet");
1072        let read_from_file = SerializedFileReader::new(test_file).unwrap();
1073
1074        let file_iter = read_from_file.get_row_iter(None).unwrap();
1075        let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1076
1077        for (a, b) in file_iter.zip(cursor_iter) {
1078            assert_eq!(a.unwrap(), b.unwrap())
1079        }
1080    }
1081
1082    #[test]
1083    fn test_file_reader_try_from() {
1084        // Valid file path
1085        let test_file = get_test_file("alltypes_plain.parquet");
1086        let test_path_buf = get_test_path("alltypes_plain.parquet");
1087        let test_path = test_path_buf.as_path();
1088        let test_path_str = test_path.to_str().unwrap();
1089
1090        let reader = SerializedFileReader::try_from(test_file);
1091        assert!(reader.is_ok());
1092
1093        let reader = SerializedFileReader::try_from(test_path);
1094        assert!(reader.is_ok());
1095
1096        let reader = SerializedFileReader::try_from(test_path_str);
1097        assert!(reader.is_ok());
1098
1099        let reader = SerializedFileReader::try_from(test_path_str.to_string());
1100        assert!(reader.is_ok());
1101
1102        // Invalid file path
1103        let test_path = Path::new("invalid.parquet");
1104        let test_path_str = test_path.to_str().unwrap();
1105
1106        let reader = SerializedFileReader::try_from(test_path);
1107        assert!(reader.is_err());
1108
1109        let reader = SerializedFileReader::try_from(test_path_str);
1110        assert!(reader.is_err());
1111
1112        let reader = SerializedFileReader::try_from(test_path_str.to_string());
1113        assert!(reader.is_err());
1114    }
1115
1116    #[test]
1117    fn test_file_reader_into_iter() {
1118        let path = get_test_path("alltypes_plain.parquet");
1119        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1120        let iter = reader.into_iter();
1121        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1122
1123        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1124    }
1125
1126    #[test]
1127    fn test_file_reader_into_iter_project() {
1128        let path = get_test_path("alltypes_plain.parquet");
1129        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1130        let schema = "message schema { OPTIONAL INT32 id; }";
1131        let proj = parse_message_type(schema).ok();
1132        let iter = reader.into_iter().project(proj).unwrap();
1133        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1134
1135        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1136    }
1137
1138    #[test]
1139    fn test_reuse_file_chunk() {
1140        // This test covers the case of maintaining the correct start position in a file
1141        // stream for each column reader after initializing and moving to the next one
1142        // (without necessarily reading the entire column).
1143        let test_file = get_test_file("alltypes_plain.parquet");
1144        let reader = SerializedFileReader::new(test_file).unwrap();
1145        let row_group = reader.get_row_group(0).unwrap();
1146
1147        let mut page_readers = Vec::new();
1148        for i in 0..row_group.num_columns() {
1149            page_readers.push(row_group.get_column_page_reader(i).unwrap());
1150        }
1151
1152        // Now buffer each col reader, we do not expect any failures like:
1153        // General("underlying Thrift error: end of file")
1154        for mut page_reader in page_readers {
1155            assert!(page_reader.get_next_page().is_ok());
1156        }
1157    }
1158
1159    #[test]
1160    fn test_file_reader() {
1161        let test_file = get_test_file("alltypes_plain.parquet");
1162        let reader_result = SerializedFileReader::new(test_file);
1163        assert!(reader_result.is_ok());
1164        let reader = reader_result.unwrap();
1165
1166        // Test contents in Parquet metadata
1167        let metadata = reader.metadata();
1168        assert_eq!(metadata.num_row_groups(), 1);
1169
1170        // Test contents in file metadata
1171        let file_metadata = metadata.file_metadata();
1172        assert!(file_metadata.created_by().is_some());
1173        assert_eq!(
1174            file_metadata.created_by().unwrap(),
1175            "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1176        );
1177        assert!(file_metadata.key_value_metadata().is_none());
1178        assert_eq!(file_metadata.num_rows(), 8);
1179        assert_eq!(file_metadata.version(), 1);
1180        assert_eq!(file_metadata.column_orders(), None);
1181
1182        // Test contents in row group metadata
1183        let row_group_metadata = metadata.row_group(0);
1184        assert_eq!(row_group_metadata.num_columns(), 11);
1185        assert_eq!(row_group_metadata.num_rows(), 8);
1186        assert_eq!(row_group_metadata.total_byte_size(), 671);
1187        // Check each column order
1188        for i in 0..row_group_metadata.num_columns() {
1189            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1190        }
1191
1192        // Test row group reader
1193        let row_group_reader_result = reader.get_row_group(0);
1194        assert!(row_group_reader_result.is_ok());
1195        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1196        assert_eq!(
1197            row_group_reader.num_columns(),
1198            row_group_metadata.num_columns()
1199        );
1200        assert_eq!(
1201            row_group_reader.metadata().total_byte_size(),
1202            row_group_metadata.total_byte_size()
1203        );
1204
1205        // Test page readers
1206        // TODO: test for every column
1207        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1208        assert!(page_reader_0_result.is_ok());
1209        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1210        let mut page_count = 0;
1211        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1212            let is_expected_page = match page {
1213                Page::DictionaryPage {
1214                    buf,
1215                    num_values,
1216                    encoding,
1217                    is_sorted,
1218                } => {
1219                    assert_eq!(buf.len(), 32);
1220                    assert_eq!(num_values, 8);
1221                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1222                    assert!(!is_sorted);
1223                    true
1224                }
1225                Page::DataPage {
1226                    buf,
1227                    num_values,
1228                    encoding,
1229                    def_level_encoding,
1230                    rep_level_encoding,
1231                    statistics,
1232                } => {
1233                    assert_eq!(buf.len(), 11);
1234                    assert_eq!(num_values, 8);
1235                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1236                    assert_eq!(def_level_encoding, Encoding::RLE);
1237                    #[allow(deprecated)]
1238                    let expected_rep_level_encoding = Encoding::BIT_PACKED;
1239                    assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1240                    assert!(statistics.is_none());
1241                    true
1242                }
1243                _ => false,
1244            };
1245            assert!(is_expected_page);
1246            page_count += 1;
1247        }
1248        assert_eq!(page_count, 2);
1249    }
1250
1251    #[test]
1252    fn test_file_reader_datapage_v2() {
1253        let test_file = get_test_file("datapage_v2.snappy.parquet");
1254        let reader_result = SerializedFileReader::new(test_file);
1255        assert!(reader_result.is_ok());
1256        let reader = reader_result.unwrap();
1257
1258        // Test contents in Parquet metadata
1259        let metadata = reader.metadata();
1260        assert_eq!(metadata.num_row_groups(), 1);
1261
1262        // Test contents in file metadata
1263        let file_metadata = metadata.file_metadata();
1264        assert!(file_metadata.created_by().is_some());
1265        assert_eq!(
1266            file_metadata.created_by().unwrap(),
1267            "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1268        );
1269        assert!(file_metadata.key_value_metadata().is_some());
1270        assert_eq!(
1271            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1272            1
1273        );
1274
1275        assert_eq!(file_metadata.num_rows(), 5);
1276        assert_eq!(file_metadata.version(), 1);
1277        assert_eq!(file_metadata.column_orders(), None);
1278
1279        let row_group_metadata = metadata.row_group(0);
1280
1281        // Check each column order
1282        for i in 0..row_group_metadata.num_columns() {
1283            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1284        }
1285
1286        // Test row group reader
1287        let row_group_reader_result = reader.get_row_group(0);
1288        assert!(row_group_reader_result.is_ok());
1289        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1290        assert_eq!(
1291            row_group_reader.num_columns(),
1292            row_group_metadata.num_columns()
1293        );
1294        assert_eq!(
1295            row_group_reader.metadata().total_byte_size(),
1296            row_group_metadata.total_byte_size()
1297        );
1298
1299        // Test page readers
1300        // TODO: test for every column
1301        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1302        assert!(page_reader_0_result.is_ok());
1303        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1304        let mut page_count = 0;
1305        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1306            let is_expected_page = match page {
1307                Page::DictionaryPage {
1308                    buf,
1309                    num_values,
1310                    encoding,
1311                    is_sorted,
1312                } => {
1313                    assert_eq!(buf.len(), 7);
1314                    assert_eq!(num_values, 1);
1315                    assert_eq!(encoding, Encoding::PLAIN);
1316                    assert!(!is_sorted);
1317                    true
1318                }
1319                Page::DataPageV2 {
1320                    buf,
1321                    num_values,
1322                    encoding,
1323                    num_nulls,
1324                    num_rows,
1325                    def_levels_byte_len,
1326                    rep_levels_byte_len,
1327                    is_compressed,
1328                    statistics,
1329                } => {
1330                    assert_eq!(buf.len(), 4);
1331                    assert_eq!(num_values, 5);
1332                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1333                    assert_eq!(num_nulls, 1);
1334                    assert_eq!(num_rows, 5);
1335                    assert_eq!(def_levels_byte_len, 2);
1336                    assert_eq!(rep_levels_byte_len, 0);
1337                    assert!(is_compressed);
1338                    assert!(statistics.is_some());
1339                    true
1340                }
1341                _ => false,
1342            };
1343            assert!(is_expected_page);
1344            page_count += 1;
1345        }
1346        assert_eq!(page_count, 2);
1347    }
1348
1349    #[test]
1350    fn test_file_reader_empty_compressed_datapage_v2() {
1351        // this file has a compressed datapage that un-compresses to 0 bytes
1352        let test_file = get_test_file("page_v2_empty_compressed.parquet");
1353        let reader_result = SerializedFileReader::new(test_file);
1354        assert!(reader_result.is_ok());
1355        let reader = reader_result.unwrap();
1356
1357        // Test contents in Parquet metadata
1358        let metadata = reader.metadata();
1359        assert_eq!(metadata.num_row_groups(), 1);
1360
1361        // Test contents in file metadata
1362        let file_metadata = metadata.file_metadata();
1363        assert!(file_metadata.created_by().is_some());
1364        assert_eq!(
1365            file_metadata.created_by().unwrap(),
1366            "parquet-cpp-arrow version 14.0.2"
1367        );
1368        assert!(file_metadata.key_value_metadata().is_some());
1369        assert_eq!(
1370            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1371            1
1372        );
1373
1374        assert_eq!(file_metadata.num_rows(), 10);
1375        assert_eq!(file_metadata.version(), 2);
1376        let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1377        assert_eq!(
1378            file_metadata.column_orders(),
1379            Some(vec![expected_order].as_ref())
1380        );
1381
1382        let row_group_metadata = metadata.row_group(0);
1383
1384        // Check each column order
1385        for i in 0..row_group_metadata.num_columns() {
1386            assert_eq!(file_metadata.column_order(i), expected_order);
1387        }
1388
1389        // Test row group reader
1390        let row_group_reader_result = reader.get_row_group(0);
1391        assert!(row_group_reader_result.is_ok());
1392        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1393        assert_eq!(
1394            row_group_reader.num_columns(),
1395            row_group_metadata.num_columns()
1396        );
1397        assert_eq!(
1398            row_group_reader.metadata().total_byte_size(),
1399            row_group_metadata.total_byte_size()
1400        );
1401
1402        // Test page readers
1403        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1404        assert!(page_reader_0_result.is_ok());
1405        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1406        let mut page_count = 0;
1407        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1408            let is_expected_page = match page {
1409                Page::DictionaryPage {
1410                    buf,
1411                    num_values,
1412                    encoding,
1413                    is_sorted,
1414                } => {
1415                    assert_eq!(buf.len(), 0);
1416                    assert_eq!(num_values, 0);
1417                    assert_eq!(encoding, Encoding::PLAIN);
1418                    assert!(!is_sorted);
1419                    true
1420                }
1421                Page::DataPageV2 {
1422                    buf,
1423                    num_values,
1424                    encoding,
1425                    num_nulls,
1426                    num_rows,
1427                    def_levels_byte_len,
1428                    rep_levels_byte_len,
1429                    is_compressed,
1430                    statistics,
1431                } => {
1432                    assert_eq!(buf.len(), 3);
1433                    assert_eq!(num_values, 10);
1434                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1435                    assert_eq!(num_nulls, 10);
1436                    assert_eq!(num_rows, 10);
1437                    assert_eq!(def_levels_byte_len, 2);
1438                    assert_eq!(rep_levels_byte_len, 0);
1439                    assert!(is_compressed);
1440                    assert!(statistics.is_some());
1441                    true
1442                }
1443                _ => false,
1444            };
1445            assert!(is_expected_page);
1446            page_count += 1;
1447        }
1448        assert_eq!(page_count, 2);
1449    }
1450
1451    #[test]
1452    fn test_file_reader_empty_datapage_v2() {
1453        // this file has 0 bytes compressed datapage that un-compresses to 0 bytes
1454        let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1455        let reader_result = SerializedFileReader::new(test_file);
1456        assert!(reader_result.is_ok());
1457        let reader = reader_result.unwrap();
1458
1459        // Test contents in Parquet metadata
1460        let metadata = reader.metadata();
1461        assert_eq!(metadata.num_row_groups(), 1);
1462
1463        // Test contents in file metadata
1464        let file_metadata = metadata.file_metadata();
1465        assert!(file_metadata.created_by().is_some());
1466        assert_eq!(
1467            file_metadata.created_by().unwrap(),
1468            "parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1469        );
1470        assert!(file_metadata.key_value_metadata().is_some());
1471        assert_eq!(
1472            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1473            2
1474        );
1475
1476        assert_eq!(file_metadata.num_rows(), 1);
1477        assert_eq!(file_metadata.version(), 1);
1478        let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1479        assert_eq!(
1480            file_metadata.column_orders(),
1481            Some(vec![expected_order].as_ref())
1482        );
1483
1484        let row_group_metadata = metadata.row_group(0);
1485
1486        // Check each column order
1487        for i in 0..row_group_metadata.num_columns() {
1488            assert_eq!(file_metadata.column_order(i), expected_order);
1489        }
1490
1491        // Test row group reader
1492        let row_group_reader_result = reader.get_row_group(0);
1493        assert!(row_group_reader_result.is_ok());
1494        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1495        assert_eq!(
1496            row_group_reader.num_columns(),
1497            row_group_metadata.num_columns()
1498        );
1499        assert_eq!(
1500            row_group_reader.metadata().total_byte_size(),
1501            row_group_metadata.total_byte_size()
1502        );
1503
1504        // Test page readers
1505        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1506        assert!(page_reader_0_result.is_ok());
1507        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1508        let mut page_count = 0;
1509        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1510            let is_expected_page = match page {
1511                Page::DataPageV2 {
1512                    buf,
1513                    num_values,
1514                    encoding,
1515                    num_nulls,
1516                    num_rows,
1517                    def_levels_byte_len,
1518                    rep_levels_byte_len,
1519                    is_compressed,
1520                    statistics,
1521                } => {
1522                    assert_eq!(buf.len(), 2);
1523                    assert_eq!(num_values, 1);
1524                    assert_eq!(encoding, Encoding::PLAIN);
1525                    assert_eq!(num_nulls, 1);
1526                    assert_eq!(num_rows, 1);
1527                    assert_eq!(def_levels_byte_len, 2);
1528                    assert_eq!(rep_levels_byte_len, 0);
1529                    assert!(is_compressed);
1530                    assert!(statistics.is_none());
1531                    true
1532                }
1533                _ => false,
1534            };
1535            assert!(is_expected_page);
1536            page_count += 1;
1537        }
1538        assert_eq!(page_count, 1);
1539    }
1540
1541    fn get_serialized_page_reader<R: ChunkReader>(
1542        file_reader: &SerializedFileReader<R>,
1543        row_group: usize,
1544        column: usize,
1545    ) -> Result<SerializedPageReader<R>> {
1546        let row_group = {
1547            let row_group_metadata = file_reader.metadata.row_group(row_group);
1548            let props = Arc::clone(&file_reader.props);
1549            let f = Arc::clone(&file_reader.chunk_reader);
1550            SerializedRowGroupReader::new(
1551                f,
1552                row_group_metadata,
1553                file_reader
1554                    .metadata
1555                    .offset_index()
1556                    .map(|x| x[row_group].as_slice()),
1557                props,
1558            )?
1559        };
1560
1561        let col = row_group.metadata.column(column);
1562
1563        let page_locations = row_group
1564            .offset_index
1565            .map(|x| x[column].page_locations.clone());
1566
1567        let props = Arc::clone(&row_group.props);
1568        SerializedPageReader::new_with_properties(
1569            Arc::clone(&row_group.chunk_reader),
1570            col,
1571            usize::try_from(row_group.metadata.num_rows())?,
1572            page_locations,
1573            props,
1574        )
1575    }
1576
1577    #[test]
1578    fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1579        let test_file = get_test_file("alltypes_plain.parquet");
1580        let reader = SerializedFileReader::new(test_file)?;
1581
1582        let mut offset_set = HashSet::new();
1583        let num_row_groups = reader.metadata.num_row_groups();
1584        for row_group in 0..num_row_groups {
1585            let num_columns = reader.metadata.row_group(row_group).num_columns();
1586            for column in 0..num_columns {
1587                let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1588
1589                while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1590                    match &page_reader.state {
1591                        SerializedPageReaderState::Pages {
1592                            page_locations,
1593                            dictionary_page,
1594                            ..
1595                        } => {
1596                            if let Some(page) = dictionary_page {
1597                                assert_eq!(page.offset as usize, page_offset);
1598                            } else if let Some(page) = page_locations.front() {
1599                                assert_eq!(page.offset as usize, page_offset);
1600                            } else {
1601                                unreachable!()
1602                            }
1603                        }
1604                        SerializedPageReaderState::Values {
1605                            offset,
1606                            next_page_header,
1607                            ..
1608                        } => {
1609                            assert!(next_page_header.is_some());
1610                            assert_eq!(*offset, page_offset);
1611                        }
1612                    }
1613                    let page = page_reader.get_next_page()?;
1614                    assert!(page.is_some());
1615                    let newly_inserted = offset_set.insert(page_offset);
1616                    assert!(newly_inserted);
1617                }
1618            }
1619        }
1620
1621        Ok(())
1622    }
1623
1624    #[test]
1625    fn test_page_iterator() {
1626        let file = get_test_file("alltypes_plain.parquet");
1627        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1628
1629        let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1630
1631        // read first page
1632        let page = page_iterator.next();
1633        assert!(page.is_some());
1634        assert!(page.unwrap().is_ok());
1635
1636        // reach end of file
1637        let page = page_iterator.next();
1638        assert!(page.is_none());
1639
1640        let row_group_indices = Box::new(0..1);
1641        let mut page_iterator =
1642            FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1643
1644        // read first page
1645        let page = page_iterator.next();
1646        assert!(page.is_some());
1647        assert!(page.unwrap().is_ok());
1648
1649        // reach end of file
1650        let page = page_iterator.next();
1651        assert!(page.is_none());
1652    }
1653
1654    #[test]
1655    fn test_file_reader_key_value_metadata() {
1656        let file = get_test_file("binary.parquet");
1657        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1658
1659        let metadata = file_reader
1660            .metadata
1661            .file_metadata()
1662            .key_value_metadata()
1663            .unwrap();
1664
1665        assert_eq!(metadata.len(), 3);
1666
1667        assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1668
1669        assert_eq!(metadata[1].key, "writer.model.name");
1670        assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1671
1672        assert_eq!(metadata[2].key, "parquet.proto.class");
1673        assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1674    }
1675
1676    #[test]
1677    fn test_file_reader_optional_metadata() {
1678        // file with optional metadata: bloom filters, encoding stats, column index and offset index.
1679        let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1680        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1681
1682        let row_group_metadata = file_reader.metadata.row_group(0);
1683        let col0_metadata = row_group_metadata.column(0);
1684
1685        // test optional bloom filter offset
1686        assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1687
1688        // test page encoding stats
1689        let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1690
1691        assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1692        assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1693        assert_eq!(page_encoding_stats.count, 1);
1694
1695        // test optional column index offset
1696        assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1697        assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1698
1699        // test optional offset index offset
1700        assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1701        assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1702    }
1703
1704    #[test]
1705    fn test_file_reader_with_no_filter() -> Result<()> {
1706        let test_file = get_test_file("alltypes_plain.parquet");
1707        let origin_reader = SerializedFileReader::new(test_file)?;
1708        // test initial number of row groups
1709        let metadata = origin_reader.metadata();
1710        assert_eq!(metadata.num_row_groups(), 1);
1711        Ok(())
1712    }
1713
1714    #[test]
1715    fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
1716        let test_file = get_test_file("alltypes_plain.parquet");
1717        let read_options = ReadOptionsBuilder::new()
1718            .with_predicate(Box::new(|_, _| false))
1719            .build();
1720        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1721        let metadata = reader.metadata();
1722        assert_eq!(metadata.num_row_groups(), 0);
1723        Ok(())
1724    }
1725
1726    #[test]
1727    fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
1728        let test_file = get_test_file("alltypes_plain.parquet");
1729        let origin_reader = SerializedFileReader::new(test_file)?;
1730        // test initial number of row groups
1731        let metadata = origin_reader.metadata();
1732        assert_eq!(metadata.num_row_groups(), 1);
1733        let mid = get_midpoint_offset(metadata.row_group(0));
1734
1735        let test_file = get_test_file("alltypes_plain.parquet");
1736        let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
1737        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1738        let metadata = reader.metadata();
1739        assert_eq!(metadata.num_row_groups(), 1);
1740
1741        let test_file = get_test_file("alltypes_plain.parquet");
1742        let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
1743        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1744        let metadata = reader.metadata();
1745        assert_eq!(metadata.num_row_groups(), 0);
1746        Ok(())
1747    }
1748
1749    #[test]
1750    fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
1751        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1752        let origin_reader = SerializedFileReader::new(test_file)?;
1753        let metadata = origin_reader.metadata();
1754        let mid = get_midpoint_offset(metadata.row_group(0));
1755
1756        // true, true predicate
1757        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1758        let read_options = ReadOptionsBuilder::new()
1759            .with_page_index()
1760            .with_predicate(Box::new(|_, _| true))
1761            .with_range(mid, mid + 1)
1762            .build();
1763        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1764        let metadata = reader.metadata();
1765        assert_eq!(metadata.num_row_groups(), 1);
1766        assert_eq!(metadata.column_index().unwrap().len(), 1);
1767        assert_eq!(metadata.offset_index().unwrap().len(), 1);
1768
1769        // true, false predicate
1770        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1771        let read_options = ReadOptionsBuilder::new()
1772            .with_page_index()
1773            .with_predicate(Box::new(|_, _| true))
1774            .with_range(0, mid)
1775            .build();
1776        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1777        let metadata = reader.metadata();
1778        assert_eq!(metadata.num_row_groups(), 0);
1779        assert!(metadata.column_index().is_none());
1780        assert!(metadata.offset_index().is_none());
1781
1782        // false, true predicate
1783        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1784        let read_options = ReadOptionsBuilder::new()
1785            .with_page_index()
1786            .with_predicate(Box::new(|_, _| false))
1787            .with_range(mid, mid + 1)
1788            .build();
1789        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1790        let metadata = reader.metadata();
1791        assert_eq!(metadata.num_row_groups(), 0);
1792        assert!(metadata.column_index().is_none());
1793        assert!(metadata.offset_index().is_none());
1794
1795        // false, false predicate
1796        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1797        let read_options = ReadOptionsBuilder::new()
1798            .with_page_index()
1799            .with_predicate(Box::new(|_, _| false))
1800            .with_range(0, mid)
1801            .build();
1802        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1803        let metadata = reader.metadata();
1804        assert_eq!(metadata.num_row_groups(), 0);
1805        assert!(metadata.column_index().is_none());
1806        assert!(metadata.offset_index().is_none());
1807        Ok(())
1808    }
1809
1810    #[test]
1811    fn test_file_reader_invalid_metadata() {
1812        let data = [
1813            255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
1814            80, 65, 82, 49,
1815        ];
1816        let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
1817        assert_eq!(
1818            ret.err().unwrap().to_string(),
1819            "Parquet error: Could not parse metadata: bad data"
1820        );
1821    }
1822
1823    #[test]
1824    // Use java parquet-tools get below pageIndex info
1825    // !```
1826    // parquet-tools column-index ./data_index_bloom_encoding_stats.parquet
1827    // row group 0:
1828    // column index for column String:
1829    // Boundary order: ASCENDING
1830    // page-0  :
1831    // null count                 min                                  max
1832    // 0                          Hello                                today
1833    //
1834    // offset index for column String:
1835    // page-0   :
1836    // offset   compressed size       first row index
1837    // 4               152                     0
1838    ///```
1839    //
1840    fn test_page_index_reader() {
1841        let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
1842        let builder = ReadOptionsBuilder::new();
1843        //enable read page index
1844        let options = builder.with_page_index().build();
1845        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1846        let reader = reader_result.unwrap();
1847
1848        // Test contents in Parquet metadata
1849        let metadata = reader.metadata();
1850        assert_eq!(metadata.num_row_groups(), 1);
1851
1852        let column_index = metadata.column_index().unwrap();
1853
1854        // only one row group
1855        assert_eq!(column_index.len(), 1);
1856        let index = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
1857            index
1858        } else {
1859            unreachable!()
1860        };
1861
1862        assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
1863        let index_in_pages = &index.indexes;
1864
1865        //only one page group
1866        assert_eq!(index_in_pages.len(), 1);
1867
1868        let page0 = &index_in_pages[0];
1869        let min = page0.min.as_ref().unwrap();
1870        let max = page0.max.as_ref().unwrap();
1871        assert_eq!(b"Hello", min.as_bytes());
1872        assert_eq!(b"today", max.as_bytes());
1873
1874        let offset_indexes = metadata.offset_index().unwrap();
1875        // only one row group
1876        assert_eq!(offset_indexes.len(), 1);
1877        let offset_index = &offset_indexes[0];
1878        let page_offset = &offset_index[0].page_locations()[0];
1879
1880        assert_eq!(4, page_offset.offset);
1881        assert_eq!(152, page_offset.compressed_page_size);
1882        assert_eq!(0, page_offset.first_row_index);
1883    }
1884
1885    #[test]
1886    fn test_page_index_reader_out_of_order() {
1887        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1888        let options = ReadOptionsBuilder::new().with_page_index().build();
1889        let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
1890        let metadata = reader.metadata();
1891
1892        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1893        let columns = metadata.row_group(0).columns();
1894        let reversed: Vec<_> = columns.iter().cloned().rev().collect();
1895
1896        let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
1897        let mut b = read_columns_indexes(&test_file, &reversed)
1898            .unwrap()
1899            .unwrap();
1900        b.reverse();
1901        assert_eq!(a, b);
1902
1903        let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
1904        let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
1905        b.reverse();
1906        assert_eq!(a, b);
1907    }
1908
1909    #[test]
1910    fn test_page_index_reader_all_type() {
1911        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1912        let builder = ReadOptionsBuilder::new();
1913        //enable read page index
1914        let options = builder.with_page_index().build();
1915        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1916        let reader = reader_result.unwrap();
1917
1918        // Test contents in Parquet metadata
1919        let metadata = reader.metadata();
1920        assert_eq!(metadata.num_row_groups(), 1);
1921
1922        let column_index = metadata.column_index().unwrap();
1923        let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
1924
1925        // only one row group
1926        assert_eq!(column_index.len(), 1);
1927        let row_group_metadata = metadata.row_group(0);
1928
1929        //col0->id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 7299, num_nulls: 0]
1930        assert!(!&column_index[0][0].is_sorted());
1931        let boundary_order = &column_index[0][0].get_boundary_order();
1932        assert!(boundary_order.is_some());
1933        matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
1934        if let Index::INT32(index) = &column_index[0][0] {
1935            check_native_page_index(
1936                index,
1937                325,
1938                get_row_group_min_max_bytes(row_group_metadata, 0),
1939                BoundaryOrder::UNORDERED,
1940            );
1941            assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
1942        } else {
1943            unreachable!()
1944        };
1945        //col1->bool_col:BOOLEAN UNCOMPRESSED DO:0 FPO:37329 SZ:3022/3022/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: false, max: true, num_nulls: 0]
1946        assert!(&column_index[0][1].is_sorted());
1947        if let Index::BOOLEAN(index) = &column_index[0][1] {
1948            assert_eq!(index.indexes.len(), 82);
1949            assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
1950        } else {
1951            unreachable!()
1952        };
1953        //col2->tinyint_col: INT32 UNCOMPRESSED DO:0 FPO:40351 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
1954        assert!(&column_index[0][2].is_sorted());
1955        if let Index::INT32(index) = &column_index[0][2] {
1956            check_native_page_index(
1957                index,
1958                325,
1959                get_row_group_min_max_bytes(row_group_metadata, 2),
1960                BoundaryOrder::ASCENDING,
1961            );
1962            assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
1963        } else {
1964            unreachable!()
1965        };
1966        //col4->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
1967        assert!(&column_index[0][3].is_sorted());
1968        if let Index::INT32(index) = &column_index[0][3] {
1969            check_native_page_index(
1970                index,
1971                325,
1972                get_row_group_min_max_bytes(row_group_metadata, 3),
1973                BoundaryOrder::ASCENDING,
1974            );
1975            assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
1976        } else {
1977            unreachable!()
1978        };
1979        //col5->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
1980        assert!(&column_index[0][4].is_sorted());
1981        if let Index::INT32(index) = &column_index[0][4] {
1982            check_native_page_index(
1983                index,
1984                325,
1985                get_row_group_min_max_bytes(row_group_metadata, 4),
1986                BoundaryOrder::ASCENDING,
1987            );
1988            assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
1989        } else {
1990            unreachable!()
1991        };
1992        //col6->bigint_col: INT64 UNCOMPRESSED DO:0 FPO:152326 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 90, num_nulls: 0]
1993        assert!(!&column_index[0][5].is_sorted());
1994        if let Index::INT64(index) = &column_index[0][5] {
1995            check_native_page_index(
1996                index,
1997                528,
1998                get_row_group_min_max_bytes(row_group_metadata, 5),
1999                BoundaryOrder::UNORDERED,
2000            );
2001            assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
2002        } else {
2003            unreachable!()
2004        };
2005        //col7->float_col: FLOAT UNCOMPRESSED DO:0 FPO:223924 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 9.9, num_nulls: 0]
2006        assert!(&column_index[0][6].is_sorted());
2007        if let Index::FLOAT(index) = &column_index[0][6] {
2008            check_native_page_index(
2009                index,
2010                325,
2011                get_row_group_min_max_bytes(row_group_metadata, 6),
2012                BoundaryOrder::ASCENDING,
2013            );
2014            assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
2015        } else {
2016            unreachable!()
2017        };
2018        //col8->double_col: DOUBLE UNCOMPRESSED DO:0 FPO:261249 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 90.89999999999999, num_nulls: 0]
2019        assert!(!&column_index[0][7].is_sorted());
2020        if let Index::DOUBLE(index) = &column_index[0][7] {
2021            check_native_page_index(
2022                index,
2023                528,
2024                get_row_group_min_max_bytes(row_group_metadata, 7),
2025                BoundaryOrder::UNORDERED,
2026            );
2027            assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
2028        } else {
2029            unreachable!()
2030        };
2031        //col9->date_string_col: BINARY UNCOMPRESSED DO:0 FPO:332847 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 01/01/09, max: 12/31/10, num_nulls: 0]
2032        assert!(!&column_index[0][8].is_sorted());
2033        if let Index::BYTE_ARRAY(index) = &column_index[0][8] {
2034            check_native_page_index(
2035                index,
2036                974,
2037                get_row_group_min_max_bytes(row_group_metadata, 8),
2038                BoundaryOrder::UNORDERED,
2039            );
2040            assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
2041        } else {
2042            unreachable!()
2043        };
2044        //col10->string_col: BINARY UNCOMPRESSED DO:0 FPO:444795 SZ:45298/45298/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2045        assert!(&column_index[0][9].is_sorted());
2046        if let Index::BYTE_ARRAY(index) = &column_index[0][9] {
2047            check_native_page_index(
2048                index,
2049                352,
2050                get_row_group_min_max_bytes(row_group_metadata, 9),
2051                BoundaryOrder::ASCENDING,
2052            );
2053            assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
2054        } else {
2055            unreachable!()
2056        };
2057        //col11->timestamp_col: INT96 UNCOMPRESSED DO:0 FPO:490093 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[num_nulls: 0, min/max not defined]
2058        //Notice: min_max values for each page for this col not exits.
2059        assert!(!&column_index[0][10].is_sorted());
2060        if let Index::NONE = &column_index[0][10] {
2061            assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
2062        } else {
2063            unreachable!()
2064        };
2065        //col12->year: INT32 UNCOMPRESSED DO:0 FPO:602041 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2009, max: 2010, num_nulls: 0]
2066        assert!(&column_index[0][11].is_sorted());
2067        if let Index::INT32(index) = &column_index[0][11] {
2068            check_native_page_index(
2069                index,
2070                325,
2071                get_row_group_min_max_bytes(row_group_metadata, 11),
2072                BoundaryOrder::ASCENDING,
2073            );
2074            assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
2075        } else {
2076            unreachable!()
2077        };
2078        //col13->month: INT32 UNCOMPRESSED DO:0 FPO:639366 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 12, num_nulls: 0]
2079        assert!(!&column_index[0][12].is_sorted());
2080        if let Index::INT32(index) = &column_index[0][12] {
2081            check_native_page_index(
2082                index,
2083                325,
2084                get_row_group_min_max_bytes(row_group_metadata, 12),
2085                BoundaryOrder::UNORDERED,
2086            );
2087            assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
2088        } else {
2089            unreachable!()
2090        };
2091    }
2092
2093    fn check_native_page_index<T: ParquetValueType>(
2094        row_group_index: &NativeIndex<T>,
2095        page_size: usize,
2096        min_max: (&[u8], &[u8]),
2097        boundary_order: BoundaryOrder,
2098    ) {
2099        assert_eq!(row_group_index.indexes.len(), page_size);
2100        assert_eq!(row_group_index.boundary_order, boundary_order);
2101        row_group_index.indexes.iter().all(|x| {
2102            x.min.as_ref().unwrap() >= &T::try_from_le_slice(min_max.0).unwrap()
2103                && x.max.as_ref().unwrap() <= &T::try_from_le_slice(min_max.1).unwrap()
2104        });
2105    }
2106
2107    fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
2108        let statistics = r.column(col_num).statistics().unwrap();
2109        (
2110            statistics.min_bytes_opt().unwrap_or_default(),
2111            statistics.max_bytes_opt().unwrap_or_default(),
2112        )
2113    }
2114
2115    #[test]
2116    fn test_skip_next_page_with_dictionary_page() {
2117        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2118        let builder = ReadOptionsBuilder::new();
2119        // enable read page index
2120        let options = builder.with_page_index().build();
2121        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2122        let reader = reader_result.unwrap();
2123
2124        let row_group_reader = reader.get_row_group(0).unwrap();
2125
2126        // use 'string_col', Boundary order: UNORDERED, total 352 data pages and 1 dictionary page.
2127        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2128
2129        let mut vec = vec![];
2130
2131        // Step 1: Peek and ensure dictionary page is correctly identified
2132        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2133        assert!(meta.is_dict);
2134
2135        // Step 2: Call skip_next_page to skip the dictionary page
2136        column_page_reader.skip_next_page().unwrap();
2137
2138        // Step 3: Read the next data page after skipping the dictionary page
2139        let page = column_page_reader.get_next_page().unwrap().unwrap();
2140        assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2141
2142        // Step 4: Continue reading remaining data pages and verify correctness
2143        for _i in 0..351 {
2144            // 352 total pages, 1 dictionary page is skipped
2145            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2146            assert!(!meta.is_dict); // Verify no dictionary page here
2147            vec.push(meta);
2148
2149            let page = column_page_reader.get_next_page().unwrap().unwrap();
2150            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2151        }
2152
2153        // Step 5: Check if all pages are read
2154        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2155        assert!(column_page_reader.get_next_page().unwrap().is_none());
2156
2157        // Step 6: Verify the number of data pages read (should be 351 data pages)
2158        assert_eq!(vec.len(), 351);
2159    }
2160
2161    #[test]
2162    fn test_skip_page_with_offset_index() {
2163        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2164        let builder = ReadOptionsBuilder::new();
2165        //enable read page index
2166        let options = builder.with_page_index().build();
2167        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2168        let reader = reader_result.unwrap();
2169
2170        let row_group_reader = reader.get_row_group(0).unwrap();
2171
2172        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
2173        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2174
2175        let mut vec = vec![];
2176
2177        for i in 0..325 {
2178            if i % 2 == 0 {
2179                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2180            } else {
2181                column_page_reader.skip_next_page().unwrap();
2182            }
2183        }
2184        //check read all pages.
2185        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2186        assert!(column_page_reader.get_next_page().unwrap().is_none());
2187
2188        assert_eq!(vec.len(), 163);
2189    }
2190
2191    #[test]
2192    fn test_skip_page_without_offset_index() {
2193        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2194
2195        // use default SerializedFileReader without read offsetIndex
2196        let reader_result = SerializedFileReader::new(test_file);
2197        let reader = reader_result.unwrap();
2198
2199        let row_group_reader = reader.get_row_group(0).unwrap();
2200
2201        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
2202        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2203
2204        let mut vec = vec![];
2205
2206        for i in 0..325 {
2207            if i % 2 == 0 {
2208                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2209            } else {
2210                column_page_reader.peek_next_page().unwrap().unwrap();
2211                column_page_reader.skip_next_page().unwrap();
2212            }
2213        }
2214        //check read all pages.
2215        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2216        assert!(column_page_reader.get_next_page().unwrap().is_none());
2217
2218        assert_eq!(vec.len(), 163);
2219    }
2220
2221    #[test]
2222    fn test_peek_page_with_dictionary_page() {
2223        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2224        let builder = ReadOptionsBuilder::new();
2225        //enable read page index
2226        let options = builder.with_page_index().build();
2227        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2228        let reader = reader_result.unwrap();
2229        let row_group_reader = reader.get_row_group(0).unwrap();
2230
2231        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
2232        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2233
2234        let mut vec = vec![];
2235
2236        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2237        assert!(meta.is_dict);
2238        let page = column_page_reader.get_next_page().unwrap().unwrap();
2239        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2240
2241        for i in 0..352 {
2242            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2243            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
2244            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
2245            if i != 351 {
2246                assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
2247            } else {
2248                // last page first row index is 7290, total row count is 7300
2249                // because first row start with zero, last page row count should be 10.
2250                assert_eq!(meta.num_rows, Some(10));
2251            }
2252            assert!(!meta.is_dict);
2253            vec.push(meta);
2254            let page = column_page_reader.get_next_page().unwrap().unwrap();
2255            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2256        }
2257
2258        //check read all pages.
2259        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2260        assert!(column_page_reader.get_next_page().unwrap().is_none());
2261
2262        assert_eq!(vec.len(), 352);
2263    }
2264
2265    #[test]
2266    fn test_peek_page_with_dictionary_page_without_offset_index() {
2267        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2268
2269        let reader_result = SerializedFileReader::new(test_file);
2270        let reader = reader_result.unwrap();
2271        let row_group_reader = reader.get_row_group(0).unwrap();
2272
2273        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
2274        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2275
2276        let mut vec = vec![];
2277
2278        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2279        assert!(meta.is_dict);
2280        let page = column_page_reader.get_next_page().unwrap().unwrap();
2281        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2282
2283        for i in 0..352 {
2284            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2285            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
2286            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
2287            if i != 351 {
2288                assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2289            } else {
2290                // last page first row index is 7290, total row count is 7300
2291                // because first row start with zero, last page row count should be 10.
2292                assert_eq!(meta.num_levels, Some(10));
2293            }
2294            assert!(!meta.is_dict);
2295            vec.push(meta);
2296            let page = column_page_reader.get_next_page().unwrap().unwrap();
2297            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2298        }
2299
2300        //check read all pages.
2301        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2302        assert!(column_page_reader.get_next_page().unwrap().is_none());
2303
2304        assert_eq!(vec.len(), 352);
2305    }
2306
2307    #[test]
2308    fn test_fixed_length_index() {
2309        let message_type = "
2310        message test_schema {
2311          OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2312        }
2313        ";
2314
2315        let schema = parse_message_type(message_type).unwrap();
2316        let mut out = Vec::with_capacity(1024);
2317        let mut writer =
2318            SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2319
2320        let mut r = writer.next_row_group().unwrap();
2321        let mut c = r.next_column().unwrap().unwrap();
2322        c.typed::<FixedLenByteArrayType>()
2323            .write_batch(
2324                &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2325                Some(&[1, 1, 0, 1]),
2326                None,
2327            )
2328            .unwrap();
2329        c.close().unwrap();
2330        r.close().unwrap();
2331        writer.close().unwrap();
2332
2333        let b = Bytes::from(out);
2334        let options = ReadOptionsBuilder::new().with_page_index().build();
2335        let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2336        let index = reader.metadata().column_index().unwrap();
2337
2338        // 1 row group
2339        assert_eq!(index.len(), 1);
2340        let c = &index[0];
2341        // 1 column
2342        assert_eq!(c.len(), 1);
2343
2344        match &c[0] {
2345            Index::FIXED_LEN_BYTE_ARRAY(v) => {
2346                assert_eq!(v.indexes.len(), 1);
2347                let page_idx = &v.indexes[0];
2348                assert_eq!(page_idx.null_count.unwrap(), 1);
2349                assert_eq!(page_idx.min.as_ref().unwrap().as_ref(), &[0; 11]);
2350                assert_eq!(page_idx.max.as_ref().unwrap().as_ref(), &[5; 11]);
2351            }
2352            _ => unreachable!(),
2353        }
2354    }
2355
2356    #[test]
2357    fn test_multi_gz() {
2358        let file = get_test_file("concatenated_gzip_members.parquet");
2359        let reader = SerializedFileReader::new(file).unwrap();
2360        let row_group_reader = reader.get_row_group(0).unwrap();
2361        match row_group_reader.get_column_reader(0).unwrap() {
2362            ColumnReader::Int64ColumnReader(mut reader) => {
2363                let mut buffer = Vec::with_capacity(1024);
2364                let mut def_levels = Vec::with_capacity(1024);
2365                let (num_records, num_values, num_levels) = reader
2366                    .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2367                    .unwrap();
2368
2369                assert_eq!(num_records, 513);
2370                assert_eq!(num_values, 513);
2371                assert_eq!(num_levels, 513);
2372
2373                let expected: Vec<i64> = (1..514).collect();
2374                assert_eq!(&buffer, &expected);
2375            }
2376            _ => unreachable!(),
2377        }
2378    }
2379
2380    #[test]
2381    fn test_byte_stream_split_extended() {
2382        let path = format!(
2383            "{}/byte_stream_split_extended.gzip.parquet",
2384            arrow::util::test_util::parquet_test_data(),
2385        );
2386        let file = File::open(path).unwrap();
2387        let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2388
2389        // Use full schema as projected schema
2390        let mut iter = reader
2391            .get_row_iter(None)
2392            .expect("Failed to create row iterator");
2393
2394        let mut start = 0;
2395        let end = reader.metadata().file_metadata().num_rows();
2396
2397        let check_row = |row: Result<Row, ParquetError>| {
2398            assert!(row.is_ok());
2399            let r = row.unwrap();
2400            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2401            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2402            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2403            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2404            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2405            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2406            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2407        };
2408
2409        while start < end {
2410            match iter.next() {
2411                Some(row) => check_row(row),
2412                None => break,
2413            };
2414            start += 1;
2415        }
2416    }
2417
2418    #[test]
2419    fn test_filtered_rowgroup_metadata() {
2420        let message_type = "
2421            message test_schema {
2422                REQUIRED INT32 a;
2423            }
2424        ";
2425        let schema = Arc::new(parse_message_type(message_type).unwrap());
2426        let props = Arc::new(
2427            WriterProperties::builder()
2428                .set_statistics_enabled(EnabledStatistics::Page)
2429                .build(),
2430        );
2431        let mut file: File = tempfile::tempfile().unwrap();
2432        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2433        let data = [1, 2, 3, 4, 5];
2434
2435        // write 5 row groups
2436        for idx in 0..5 {
2437            let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2438            let mut row_group_writer = file_writer.next_row_group().unwrap();
2439            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2440                writer
2441                    .typed::<Int32Type>()
2442                    .write_batch(data_i.as_slice(), None, None)
2443                    .unwrap();
2444                writer.close().unwrap();
2445            }
2446            row_group_writer.close().unwrap();
2447            file_writer.flushed_row_groups();
2448        }
2449        let file_metadata = file_writer.close().unwrap();
2450
2451        assert_eq!(file_metadata.num_rows, 25);
2452        assert_eq!(file_metadata.row_groups.len(), 5);
2453
2454        // read only the 3rd row group
2455        let read_options = ReadOptionsBuilder::new()
2456            .with_page_index()
2457            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2458            .build();
2459        let reader =
2460            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2461                .unwrap();
2462        let metadata = reader.metadata();
2463
2464        // check we got the expected row group
2465        assert_eq!(metadata.num_row_groups(), 1);
2466        assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2467
2468        // check we only got the relevant page indexes
2469        assert!(metadata.column_index().is_some());
2470        assert!(metadata.offset_index().is_some());
2471        assert_eq!(metadata.column_index().unwrap().len(), 1);
2472        assert_eq!(metadata.offset_index().unwrap().len(), 1);
2473        let col_idx = metadata.column_index().unwrap();
2474        let off_idx = metadata.offset_index().unwrap();
2475        let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2476        let pg_idx = &col_idx[0][0];
2477        let off_idx_i = &off_idx[0][0];
2478
2479        // test that we got the index matching the row group
2480        match pg_idx {
2481            Index::INT32(int_idx) => {
2482                let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2483                let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2484                assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2485                assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2486            }
2487            _ => panic!("wrong stats type"),
2488        }
2489
2490        // check offset index matches too
2491        assert_eq!(
2492            off_idx_i.page_locations[0].offset,
2493            metadata.row_group(0).column(0).data_page_offset()
2494        );
2495
2496        // read non-contiguous row groups
2497        let read_options = ReadOptionsBuilder::new()
2498            .with_page_index()
2499            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2500            .build();
2501        let reader =
2502            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2503                .unwrap();
2504        let metadata = reader.metadata();
2505
2506        // check we got the expected row groups
2507        assert_eq!(metadata.num_row_groups(), 2);
2508        assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2509        assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2510
2511        // check we only got the relevant page indexes
2512        assert!(metadata.column_index().is_some());
2513        assert!(metadata.offset_index().is_some());
2514        assert_eq!(metadata.column_index().unwrap().len(), 2);
2515        assert_eq!(metadata.offset_index().unwrap().len(), 2);
2516        let col_idx = metadata.column_index().unwrap();
2517        let off_idx = metadata.offset_index().unwrap();
2518
2519        for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2520            let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2521            let pg_idx = &col_idx_i[0];
2522            let off_idx_i = &off_idx[i][0];
2523
2524            // test that we got the index matching the row group
2525            match pg_idx {
2526                Index::INT32(int_idx) => {
2527                    let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2528                    let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2529                    assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2530                    assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2531                }
2532                _ => panic!("wrong stats type"),
2533            }
2534
2535            // check offset index matches too
2536            assert_eq!(
2537                off_idx_i.page_locations[0].offset,
2538                metadata.row_group(i).column(0).data_page_offset()
2539            );
2540        }
2541    }
2542}