parquet/file/
serialized_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader
19//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM)
20
21use crate::basic::{Encoding, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{create_codec, Codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{read_and_decrypt, CryptoContext};
27use crate::errors::{ParquetError, Result};
28use crate::file::page_index::offset_index::OffsetIndexMetaData;
29use crate::file::{
30    metadata::*,
31    properties::{ReaderProperties, ReaderPropertiesPtr},
32    reader::*,
33    statistics,
34};
35use crate::format::{PageHeader, PageLocation, PageType};
36use crate::record::reader::RowIter;
37use crate::record::Row;
38use crate::schema::types::Type as SchemaType;
39use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
40use bytes::Bytes;
41use std::collections::VecDeque;
42use std::iter;
43use std::{fs::File, io::Read, path::Path, sync::Arc};
44use thrift::protocol::TCompactInputProtocol;
45
46impl TryFrom<File> for SerializedFileReader<File> {
47    type Error = ParquetError;
48
49    fn try_from(file: File) -> Result<Self> {
50        Self::new(file)
51    }
52}
53
54impl TryFrom<&Path> for SerializedFileReader<File> {
55    type Error = ParquetError;
56
57    fn try_from(path: &Path) -> Result<Self> {
58        let file = File::open(path)?;
59        Self::try_from(file)
60    }
61}
62
63impl TryFrom<String> for SerializedFileReader<File> {
64    type Error = ParquetError;
65
66    fn try_from(path: String) -> Result<Self> {
67        Self::try_from(Path::new(&path))
68    }
69}
70
71impl TryFrom<&str> for SerializedFileReader<File> {
72    type Error = ParquetError;
73
74    fn try_from(path: &str) -> Result<Self> {
75        Self::try_from(Path::new(&path))
76    }
77}
78
79/// Conversion into a [`RowIter`]
80/// using the full file schema over all row groups.
81impl IntoIterator for SerializedFileReader<File> {
82    type Item = Result<Row>;
83    type IntoIter = RowIter<'static>;
84
85    fn into_iter(self) -> Self::IntoIter {
86        RowIter::from_file_into(Box::new(self))
87    }
88}
89
90// ----------------------------------------------------------------------
91// Implementations of file & row group readers
92
93/// A serialized implementation for Parquet [`FileReader`].
94pub struct SerializedFileReader<R: ChunkReader> {
95    chunk_reader: Arc<R>,
96    metadata: Arc<ParquetMetaData>,
97    props: ReaderPropertiesPtr,
98}
99
100/// A predicate for filtering row groups, invoked with the metadata and index
101/// of each row group in the file. Only row groups for which the predicate
102/// evaluates to `true` will be scanned
103pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
104
105/// A builder for [`ReadOptions`].
106/// For the predicates that are added to the builder,
107/// they will be chained using 'AND' to filter the row groups.
108#[derive(Default)]
109pub struct ReadOptionsBuilder {
110    predicates: Vec<ReadGroupPredicate>,
111    enable_page_index: bool,
112    props: Option<ReaderProperties>,
113}
114
115impl ReadOptionsBuilder {
116    /// New builder
117    pub fn new() -> Self {
118        Self::default()
119    }
120
121    /// Add a predicate on row group metadata to the reading option,
122    /// Filter only row groups that match the predicate criteria
123    pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
124        self.predicates.push(predicate);
125        self
126    }
127
128    /// Add a range predicate on filtering row groups if their midpoints are within
129    /// the Closed-Open range `[start..end) {x | start <= x < end}`
130    pub fn with_range(mut self, start: i64, end: i64) -> Self {
131        assert!(start < end);
132        let predicate = move |rg: &RowGroupMetaData, _: usize| {
133            let mid = get_midpoint_offset(rg);
134            mid >= start && mid < end
135        };
136        self.predicates.push(Box::new(predicate));
137        self
138    }
139
140    /// Enable reading the page index structures described in
141    /// "[Column Index] Layout to Support Page Skipping"
142    ///
143    /// [Column Index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
144    pub fn with_page_index(mut self) -> Self {
145        self.enable_page_index = true;
146        self
147    }
148
149    /// Set the [`ReaderProperties`] configuration.
150    pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
151        self.props = Some(properties);
152        self
153    }
154
155    /// Seal the builder and return the read options
156    pub fn build(self) -> ReadOptions {
157        let props = self
158            .props
159            .unwrap_or_else(|| ReaderProperties::builder().build());
160        ReadOptions {
161            predicates: self.predicates,
162            enable_page_index: self.enable_page_index,
163            props,
164        }
165    }
166}
167
168/// A collection of options for reading a Parquet file.
169///
170/// Currently, only predicates on row group metadata are supported.
171/// All predicates will be chained using 'AND' to filter the row groups.
172pub struct ReadOptions {
173    predicates: Vec<ReadGroupPredicate>,
174    enable_page_index: bool,
175    props: ReaderProperties,
176}
177
178impl<R: 'static + ChunkReader> SerializedFileReader<R> {
179    /// Creates file reader from a Parquet file.
180    /// Returns error if Parquet file does not exist or is corrupt.
181    pub fn new(chunk_reader: R) -> Result<Self> {
182        let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
183        let props = Arc::new(ReaderProperties::builder().build());
184        Ok(Self {
185            chunk_reader: Arc::new(chunk_reader),
186            metadata: Arc::new(metadata),
187            props,
188        })
189    }
190
191    /// Creates file reader from a Parquet file with read options.
192    /// Returns error if Parquet file does not exist or is corrupt.
193    pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
194        let mut metadata_builder = ParquetMetaDataReader::new()
195            .parse_and_finish(&chunk_reader)?
196            .into_builder();
197        let mut predicates = options.predicates;
198
199        // Filter row groups based on the predicates
200        for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
201            let mut keep = true;
202            for predicate in &mut predicates {
203                if !predicate(&rg_meta, i) {
204                    keep = false;
205                    break;
206                }
207            }
208            if keep {
209                metadata_builder = metadata_builder.add_row_group(rg_meta);
210            }
211        }
212
213        let mut metadata = metadata_builder.build();
214
215        // If page indexes are desired, build them with the filtered set of row groups
216        if options.enable_page_index {
217            let mut reader =
218                ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
219            reader.read_page_indexes(&chunk_reader)?;
220            metadata = reader.finish()?;
221        }
222
223        Ok(Self {
224            chunk_reader: Arc::new(chunk_reader),
225            metadata: Arc::new(metadata),
226            props: Arc::new(options.props),
227        })
228    }
229}
230
231/// Get midpoint offset for a row group
232fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
233    let col = meta.column(0);
234    let mut offset = col.data_page_offset();
235    if let Some(dic_offset) = col.dictionary_page_offset() {
236        if offset > dic_offset {
237            offset = dic_offset
238        }
239    };
240    offset + meta.compressed_size() / 2
241}
242
243impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
244    fn metadata(&self) -> &ParquetMetaData {
245        &self.metadata
246    }
247
248    fn num_row_groups(&self) -> usize {
249        self.metadata.num_row_groups()
250    }
251
252    fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
253        let row_group_metadata = self.metadata.row_group(i);
254        // Row groups should be processed sequentially.
255        let props = Arc::clone(&self.props);
256        let f = Arc::clone(&self.chunk_reader);
257        Ok(Box::new(SerializedRowGroupReader::new(
258            f,
259            row_group_metadata,
260            self.metadata.offset_index().map(|x| x[i].as_slice()),
261            props,
262        )?))
263    }
264
265    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
266        RowIter::from_file(projection, self)
267    }
268}
269
270/// A serialized implementation for Parquet [`RowGroupReader`].
271pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
272    chunk_reader: Arc<R>,
273    metadata: &'a RowGroupMetaData,
274    offset_index: Option<&'a [OffsetIndexMetaData]>,
275    props: ReaderPropertiesPtr,
276    bloom_filters: Vec<Option<Sbbf>>,
277}
278
279impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
280    /// Creates new row group reader from a file, row group metadata and custom config.
281    pub fn new(
282        chunk_reader: Arc<R>,
283        metadata: &'a RowGroupMetaData,
284        offset_index: Option<&'a [OffsetIndexMetaData]>,
285        props: ReaderPropertiesPtr,
286    ) -> Result<Self> {
287        let bloom_filters = if props.read_bloom_filter() {
288            metadata
289                .columns()
290                .iter()
291                .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
292                .collect::<Result<Vec<_>>>()?
293        } else {
294            iter::repeat(None).take(metadata.columns().len()).collect()
295        };
296        Ok(Self {
297            chunk_reader,
298            metadata,
299            offset_index,
300            props,
301            bloom_filters,
302        })
303    }
304}
305
306impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
307    fn metadata(&self) -> &RowGroupMetaData {
308        self.metadata
309    }
310
311    fn num_columns(&self) -> usize {
312        self.metadata.num_columns()
313    }
314
315    // TODO: fix PARQUET-816
316    fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
317        let col = self.metadata.column(i);
318
319        let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
320
321        let props = Arc::clone(&self.props);
322        Ok(Box::new(SerializedPageReader::new_with_properties(
323            Arc::clone(&self.chunk_reader),
324            col,
325            self.metadata.num_rows() as usize,
326            page_locations,
327            props,
328        )?))
329    }
330
331    /// get bloom filter for the `i`th column
332    fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
333        self.bloom_filters[i].as_ref()
334    }
335
336    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
337        RowIter::from_row_group(projection, self)
338    }
339}
340
341/// Reads a [`PageHeader`] from the provided [`Read`]
342pub(crate) fn read_page_header<T: Read>(input: &mut T) -> Result<PageHeader> {
343    let mut prot = TCompactInputProtocol::new(input);
344    Ok(PageHeader::read_from_in_protocol(&mut prot)?)
345}
346
347#[cfg(feature = "encryption")]
348pub(crate) fn read_encrypted_page_header<T: Read>(
349    input: &mut T,
350    crypto_context: Arc<CryptoContext>,
351) -> Result<PageHeader> {
352    let data_decryptor = crypto_context.data_decryptor();
353    let aad = crypto_context.create_page_header_aad()?;
354
355    let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
356        ParquetError::General(format!(
357            "Error decrypting column {}, decryptor may be wrong or missing",
358            crypto_context.column_ordinal
359        ))
360    })?;
361
362    let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
363    Ok(PageHeader::read_from_in_protocol(&mut prot)?)
364}
365
366/// Reads a [`PageHeader`] from the provided [`Read`] returning the number of bytes read.
367/// If the page header is encrypted [`CryptoContext`] must be provided.
368#[cfg(feature = "encryption")]
369fn read_encrypted_page_header_len<T: Read>(
370    input: &mut T,
371    crypto_context: Option<Arc<CryptoContext>>,
372) -> Result<(usize, PageHeader)> {
373    /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
374    struct TrackedRead<R> {
375        inner: R,
376        bytes_read: usize,
377    }
378
379    impl<R: Read> Read for TrackedRead<R> {
380        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
381            let v = self.inner.read(buf)?;
382            self.bytes_read += v;
383            Ok(v)
384        }
385    }
386
387    let mut tracked = TrackedRead {
388        inner: input,
389        bytes_read: 0,
390    };
391    let header = read_encrypted_page_header(&mut tracked, crypto_context.unwrap())?;
392    Ok((tracked.bytes_read, header))
393}
394
395/// Reads a [`PageHeader`] from the provided [`Read`] returning the number of bytes read.
396fn read_page_header_len<T: Read>(input: &mut T) -> Result<(usize, PageHeader)> {
397    /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
398    struct TrackedRead<R> {
399        inner: R,
400        bytes_read: usize,
401    }
402
403    impl<R: Read> Read for TrackedRead<R> {
404        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
405            let v = self.inner.read(buf)?;
406            self.bytes_read += v;
407            Ok(v)
408        }
409    }
410
411    let mut tracked = TrackedRead {
412        inner: input,
413        bytes_read: 0,
414    };
415    let header = read_page_header(&mut tracked)?;
416    Ok((tracked.bytes_read, header))
417}
418
419/// Decodes a [`Page`] from the provided `buffer`
420pub(crate) fn decode_page(
421    page_header: PageHeader,
422    buffer: Bytes,
423    physical_type: Type,
424    decompressor: Option<&mut Box<dyn Codec>>,
425) -> Result<Page> {
426    // Verify the 32-bit CRC checksum of the page
427    #[cfg(feature = "crc")]
428    if let Some(expected_crc) = page_header.crc {
429        let crc = crc32fast::hash(&buffer);
430        if crc != expected_crc as u32 {
431            return Err(general_err!("Page CRC checksum mismatch"));
432        }
433    }
434
435    // When processing data page v2, depending on enabled compression for the
436    // page, we should account for uncompressed data ('offset') of
437    // repetition and definition levels.
438    //
439    // We always use 0 offset for other pages other than v2, `true` flag means
440    // that compression will be applied if decompressor is defined
441    let mut offset: usize = 0;
442    let mut can_decompress = true;
443
444    if let Some(ref header_v2) = page_header.data_page_header_v2 {
445        offset = (header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length)
446            as usize;
447        // When is_compressed flag is missing the page is considered compressed
448        can_decompress = header_v2.is_compressed.unwrap_or(true);
449    }
450
451    // TODO: page header could be huge because of statistics. We should set a
452    // maximum page header size and abort if that is exceeded.
453    let buffer = match decompressor {
454        Some(decompressor) if can_decompress => {
455            let uncompressed_size = page_header.uncompressed_page_size as usize;
456            let mut decompressed = Vec::with_capacity(uncompressed_size);
457            let compressed = &buffer.as_ref()[offset..];
458            decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
459            decompressor.decompress(
460                compressed,
461                &mut decompressed,
462                Some(uncompressed_size - offset),
463            )?;
464
465            if decompressed.len() != uncompressed_size {
466                return Err(general_err!(
467                    "Actual decompressed size doesn't match the expected one ({} vs {})",
468                    decompressed.len(),
469                    uncompressed_size
470                ));
471            }
472
473            Bytes::from(decompressed)
474        }
475        _ => buffer,
476    };
477
478    let result = match page_header.type_ {
479        PageType::DICTIONARY_PAGE => {
480            let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
481                ParquetError::General("Missing dictionary page header".to_string())
482            })?;
483            let is_sorted = dict_header.is_sorted.unwrap_or(false);
484            Page::DictionaryPage {
485                buf: buffer,
486                num_values: dict_header.num_values.try_into()?,
487                encoding: Encoding::try_from(dict_header.encoding)?,
488                is_sorted,
489            }
490        }
491        PageType::DATA_PAGE => {
492            let header = page_header
493                .data_page_header
494                .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
495            Page::DataPage {
496                buf: buffer,
497                num_values: header.num_values.try_into()?,
498                encoding: Encoding::try_from(header.encoding)?,
499                def_level_encoding: Encoding::try_from(header.definition_level_encoding)?,
500                rep_level_encoding: Encoding::try_from(header.repetition_level_encoding)?,
501                statistics: statistics::from_thrift(physical_type, header.statistics)?,
502            }
503        }
504        PageType::DATA_PAGE_V2 => {
505            let header = page_header
506                .data_page_header_v2
507                .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
508            let is_compressed = header.is_compressed.unwrap_or(true);
509            Page::DataPageV2 {
510                buf: buffer,
511                num_values: header.num_values.try_into()?,
512                encoding: Encoding::try_from(header.encoding)?,
513                num_nulls: header.num_nulls.try_into()?,
514                num_rows: header.num_rows.try_into()?,
515                def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
516                rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
517                is_compressed,
518                statistics: statistics::from_thrift(physical_type, header.statistics)?,
519            }
520        }
521        _ => {
522            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
523            unimplemented!("Page type {:?} is not supported", page_header.type_)
524        }
525    };
526
527    Ok(result)
528}
529
530enum SerializedPageReaderState {
531    Values {
532        /// The current byte offset in the reader
533        offset: usize,
534
535        /// The length of the chunk in bytes
536        remaining_bytes: usize,
537
538        // If the next page header has already been "peeked", we will cache it and it`s length here
539        next_page_header: Option<Box<PageHeader>>,
540
541        /// The index of the data page within this column chunk
542        page_ordinal: usize,
543
544        /// Whether the next page is expected to be a dictionary page
545        require_dictionary: bool,
546    },
547    Pages {
548        /// Remaining page locations
549        page_locations: VecDeque<PageLocation>,
550        /// Remaining dictionary location if any
551        dictionary_page: Option<PageLocation>,
552        /// The total number of rows in this column chunk
553        total_rows: usize,
554    },
555}
556
557/// A serialized implementation for Parquet [`PageReader`].
558pub struct SerializedPageReader<R: ChunkReader> {
559    /// The chunk reader
560    reader: Arc<R>,
561
562    /// The compression codec for this column chunk. Only set for non-PLAIN codec.
563    decompressor: Option<Box<dyn Codec>>,
564
565    /// Column chunk type.
566    physical_type: Type,
567
568    state: SerializedPageReaderState,
569
570    /// Crypto context carrying objects required for decryption
571    #[cfg(feature = "encryption")]
572    crypto_context: Option<Arc<CryptoContext>>,
573}
574
575impl<R: ChunkReader> SerializedPageReader<R> {
576    /// Creates a new serialized page reader from a chunk reader and metadata
577    pub fn new(
578        reader: Arc<R>,
579        column_chunk_metadata: &ColumnChunkMetaData,
580        total_rows: usize,
581        page_locations: Option<Vec<PageLocation>>,
582    ) -> Result<Self> {
583        let props = Arc::new(ReaderProperties::builder().build());
584        SerializedPageReader::new_with_properties(
585            reader,
586            column_chunk_metadata,
587            total_rows,
588            page_locations,
589            props,
590        )
591    }
592
593    /// Stub No-op implementation when encryption is disabled.
594    #[cfg(all(feature = "arrow", not(feature = "encryption")))]
595    pub(crate) fn add_crypto_context(
596        self,
597        _rg_idx: usize,
598        _column_idx: usize,
599        _parquet_meta_data: &ParquetMetaData,
600        _column_chunk_metadata: &ColumnChunkMetaData,
601    ) -> Result<SerializedPageReader<R>> {
602        Ok(self)
603    }
604
605    /// Adds any necessary crypto context to this page reader, if encryption is enabled.
606    #[cfg(feature = "encryption")]
607    pub(crate) fn add_crypto_context(
608        mut self,
609        rg_idx: usize,
610        column_idx: usize,
611        parquet_meta_data: &ParquetMetaData,
612        column_chunk_metadata: &ColumnChunkMetaData,
613    ) -> Result<SerializedPageReader<R>> {
614        let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
615            return Ok(self);
616        };
617        let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
618            return Ok(self);
619        };
620        let crypto_context =
621            CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
622        self.crypto_context = Some(Arc::new(crypto_context));
623        Ok(self)
624    }
625
626    /// Creates a new serialized page with custom options.
627    pub fn new_with_properties(
628        reader: Arc<R>,
629        meta: &ColumnChunkMetaData,
630        total_rows: usize,
631        page_locations: Option<Vec<PageLocation>>,
632        props: ReaderPropertiesPtr,
633    ) -> Result<Self> {
634        let decompressor = create_codec(meta.compression(), props.codec_options())?;
635        let (start, len) = meta.byte_range();
636
637        let state = match page_locations {
638            Some(locations) => {
639                let dictionary_page = match locations.first() {
640                    Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
641                        offset: start as i64,
642                        compressed_page_size: (dict_offset.offset as u64 - start) as i32,
643                        first_row_index: 0,
644                    }),
645                    _ => None,
646                };
647
648                SerializedPageReaderState::Pages {
649                    page_locations: locations.into(),
650                    dictionary_page,
651                    total_rows,
652                }
653            }
654            None => SerializedPageReaderState::Values {
655                offset: start as usize,
656                remaining_bytes: len as usize,
657                next_page_header: None,
658                page_ordinal: 0,
659                require_dictionary: meta.dictionary_page_offset().is_some(),
660            },
661        };
662        Ok(Self {
663            reader,
664            decompressor,
665            state,
666            physical_type: meta.column_type(),
667            #[cfg(feature = "encryption")]
668            crypto_context: None,
669        })
670    }
671
672    /// Similar to `peek_next_page`, but returns the offset of the next page instead of the page metadata.
673    /// Unlike page metadata, an offset can uniquely identify a page.
674    ///
675    /// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice.
676    /// This function allows us to check if the next page is being cached or read previously.
677    #[cfg(test)]
678    fn peek_next_page_offset(&mut self) -> Result<Option<usize>> {
679        match &mut self.state {
680            SerializedPageReaderState::Values {
681                offset,
682                remaining_bytes,
683                next_page_header,
684                ..
685            } => {
686                loop {
687                    if *remaining_bytes == 0 {
688                        return Ok(None);
689                    }
690                    return if let Some(header) = next_page_header.as_ref() {
691                        if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
692                            Ok(Some(*offset))
693                        } else {
694                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
695                            *next_page_header = None;
696                            continue;
697                        }
698                    } else {
699                        let mut read = self.reader.get_read(*offset as u64)?;
700                        let (header_len, header) = read_page_header_len(&mut read)?;
701                        *offset += header_len;
702                        *remaining_bytes -= header_len;
703                        let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
704                            Ok(Some(*offset))
705                        } else {
706                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
707                            continue;
708                        };
709                        *next_page_header = Some(Box::new(header));
710                        page_meta
711                    };
712                }
713            }
714            SerializedPageReaderState::Pages {
715                page_locations,
716                dictionary_page,
717                ..
718            } => {
719                if let Some(page) = dictionary_page {
720                    Ok(Some(page.offset as usize))
721                } else if let Some(page) = page_locations.front() {
722                    Ok(Some(page.offset as usize))
723                } else {
724                    Ok(None)
725                }
726            }
727        }
728    }
729}
730
731impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
732    type Item = Result<Page>;
733
734    fn next(&mut self) -> Option<Self::Item> {
735        self.get_next_page().transpose()
736    }
737}
738
739fn verify_page_header_len(header_len: usize, remaining_bytes: usize) -> Result<()> {
740    if header_len > remaining_bytes {
741        return Err(eof_err!("Invalid page header"));
742    }
743    Ok(())
744}
745
746fn verify_page_size(
747    compressed_size: i32,
748    uncompressed_size: i32,
749    remaining_bytes: usize,
750) -> Result<()> {
751    // The page's compressed size should not exceed the remaining bytes that are
752    // available to read. The page's uncompressed size is the expected size
753    // after decompression, which can never be negative.
754    if compressed_size < 0 || compressed_size as usize > remaining_bytes || uncompressed_size < 0 {
755        return Err(eof_err!("Invalid page header"));
756    }
757    Ok(())
758}
759
760impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
761    fn get_next_page(&mut self) -> Result<Option<Page>> {
762        loop {
763            let page = match &mut self.state {
764                SerializedPageReaderState::Values {
765                    offset,
766                    remaining_bytes: remaining,
767                    next_page_header,
768                    page_ordinal,
769                    require_dictionary,
770                } => {
771                    if *remaining == 0 {
772                        return Ok(None);
773                    }
774
775                    let mut read = self.reader.get_read(*offset as u64)?;
776                    let header = if let Some(header) = next_page_header.take() {
777                        *header
778                    } else {
779                        #[cfg(feature = "encryption")]
780                        let (header_len, header) = if self.crypto_context.is_some() {
781                            let crypto_context = page_crypto_context(
782                                &self.crypto_context,
783                                *page_ordinal,
784                                *require_dictionary,
785                            )?;
786                            read_encrypted_page_header_len(&mut read, crypto_context)?
787                        } else {
788                            read_page_header_len(&mut read)?
789                        };
790
791                        #[cfg(not(feature = "encryption"))]
792                        let (header_len, header) = read_page_header_len(&mut read)?;
793
794                        verify_page_header_len(header_len, *remaining)?;
795                        *offset += header_len;
796                        *remaining -= header_len;
797                        header
798                    };
799                    verify_page_size(
800                        header.compressed_page_size,
801                        header.uncompressed_page_size,
802                        *remaining,
803                    )?;
804                    let data_len = header.compressed_page_size as usize;
805                    *offset += data_len;
806                    *remaining -= data_len;
807
808                    if header.type_ == PageType::INDEX_PAGE {
809                        continue;
810                    }
811
812                    let mut buffer = Vec::with_capacity(data_len);
813                    let read = read.take(data_len as u64).read_to_end(&mut buffer)?;
814
815                    if read != data_len {
816                        return Err(eof_err!(
817                            "Expected to read {} bytes of page, read only {}",
818                            data_len,
819                            read
820                        ));
821                    }
822
823                    #[cfg(feature = "encryption")]
824                    let crypto_context = page_crypto_context(
825                        &self.crypto_context,
826                        *page_ordinal,
827                        *require_dictionary,
828                    )?;
829                    #[cfg(feature = "encryption")]
830                    let buffer: Vec<u8> = if let Some(crypto_context) = crypto_context {
831                        let decryptor = crypto_context.data_decryptor();
832                        let aad = crypto_context.create_page_aad()?;
833                        decryptor.decrypt(buffer.as_ref(), &aad)?
834                    } else {
835                        buffer
836                    };
837
838                    let page = decode_page(
839                        header,
840                        Bytes::from(buffer),
841                        self.physical_type,
842                        self.decompressor.as_mut(),
843                    )?;
844                    if page.is_data_page() {
845                        *page_ordinal += 1;
846                    } else if page.is_dictionary_page() {
847                        *require_dictionary = false;
848                    }
849                    page
850                }
851                SerializedPageReaderState::Pages {
852                    page_locations,
853                    dictionary_page,
854                    ..
855                } => {
856                    let front = match dictionary_page
857                        .take()
858                        .or_else(|| page_locations.pop_front())
859                    {
860                        Some(front) => front,
861                        None => return Ok(None),
862                    };
863
864                    let page_len = front.compressed_page_size as usize;
865
866                    let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
867
868                    let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
869                    let header = PageHeader::read_from_in_protocol(&mut prot)?;
870                    let offset = buffer.len() - prot.as_slice().len();
871
872                    let bytes = buffer.slice(offset..);
873                    decode_page(
874                        header,
875                        bytes,
876                        self.physical_type,
877                        self.decompressor.as_mut(),
878                    )?
879                }
880            };
881
882            return Ok(Some(page));
883        }
884    }
885
886    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
887        match &mut self.state {
888            SerializedPageReaderState::Values {
889                offset,
890                remaining_bytes,
891                next_page_header,
892                ..
893            } => {
894                loop {
895                    if *remaining_bytes == 0 {
896                        return Ok(None);
897                    }
898                    return if let Some(header) = next_page_header.as_ref() {
899                        if let Ok(page_meta) = (&**header).try_into() {
900                            Ok(Some(page_meta))
901                        } else {
902                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
903                            *next_page_header = None;
904                            continue;
905                        }
906                    } else {
907                        let mut read = self.reader.get_read(*offset as u64)?;
908                        let (header_len, header) = read_page_header_len(&mut read)?;
909                        verify_page_header_len(header_len, *remaining_bytes)?;
910                        *offset += header_len;
911                        *remaining_bytes -= header_len;
912                        let page_meta = if let Ok(page_meta) = (&header).try_into() {
913                            Ok(Some(page_meta))
914                        } else {
915                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
916                            continue;
917                        };
918                        *next_page_header = Some(Box::new(header));
919                        page_meta
920                    };
921                }
922            }
923            SerializedPageReaderState::Pages {
924                page_locations,
925                dictionary_page,
926                total_rows,
927            } => {
928                if dictionary_page.is_some() {
929                    Ok(Some(PageMetadata {
930                        num_rows: None,
931                        num_levels: None,
932                        is_dict: true,
933                    }))
934                } else if let Some(page) = page_locations.front() {
935                    let next_rows = page_locations
936                        .get(1)
937                        .map(|x| x.first_row_index as usize)
938                        .unwrap_or(*total_rows);
939
940                    Ok(Some(PageMetadata {
941                        num_rows: Some(next_rows - page.first_row_index as usize),
942                        num_levels: None,
943                        is_dict: false,
944                    }))
945                } else {
946                    Ok(None)
947                }
948            }
949        }
950    }
951
952    fn skip_next_page(&mut self) -> Result<()> {
953        match &mut self.state {
954            SerializedPageReaderState::Values {
955                offset,
956                remaining_bytes,
957                next_page_header,
958                ..
959            } => {
960                if let Some(buffered_header) = next_page_header.take() {
961                    verify_page_size(
962                        buffered_header.compressed_page_size,
963                        buffered_header.uncompressed_page_size,
964                        *remaining_bytes,
965                    )?;
966                    // The next page header has already been peeked, so just advance the offset
967                    *offset += buffered_header.compressed_page_size as usize;
968                    *remaining_bytes -= buffered_header.compressed_page_size as usize;
969                } else {
970                    let mut read = self.reader.get_read(*offset as u64)?;
971                    let (header_len, header) = read_page_header_len(&mut read)?;
972                    verify_page_header_len(header_len, *remaining_bytes)?;
973                    verify_page_size(
974                        header.compressed_page_size,
975                        header.uncompressed_page_size,
976                        *remaining_bytes,
977                    )?;
978                    let data_page_size = header.compressed_page_size as usize;
979                    *offset += header_len + data_page_size;
980                    *remaining_bytes -= header_len + data_page_size;
981                }
982                Ok(())
983            }
984            SerializedPageReaderState::Pages { page_locations, .. } => {
985                page_locations.pop_front();
986
987                Ok(())
988            }
989        }
990    }
991
992    fn at_record_boundary(&mut self) -> Result<bool> {
993        match &mut self.state {
994            SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
995            SerializedPageReaderState::Pages { .. } => Ok(true),
996        }
997    }
998}
999
1000#[cfg(feature = "encryption")]
1001fn page_crypto_context(
1002    crypto_context: &Option<Arc<CryptoContext>>,
1003    page_ordinal: usize,
1004    dictionary_page: bool,
1005) -> Result<Option<Arc<CryptoContext>>> {
1006    Ok(crypto_context.as_ref().map(|c| {
1007        Arc::new(if dictionary_page {
1008            c.for_dictionary_page()
1009        } else {
1010            c.with_page_ordinal(page_ordinal)
1011        })
1012    }))
1013}
1014
1015#[cfg(test)]
1016mod tests {
1017    use std::collections::HashSet;
1018
1019    use bytes::Buf;
1020
1021    use crate::file::properties::{EnabledStatistics, WriterProperties};
1022    use crate::format::BoundaryOrder;
1023
1024    use crate::basic::{self, ColumnOrder};
1025    use crate::column::reader::ColumnReader;
1026    use crate::data_type::private::ParquetValueType;
1027    use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1028    use crate::file::page_index::index::{Index, NativeIndex};
1029    use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
1030    use crate::file::writer::SerializedFileWriter;
1031    use crate::record::RowAccessor;
1032    use crate::schema::parser::parse_message_type;
1033    use crate::util::test_common::file_util::{get_test_file, get_test_path};
1034
1035    use super::*;
1036
1037    #[test]
1038    fn test_cursor_and_file_has_the_same_behaviour() {
1039        let mut buf: Vec<u8> = Vec::new();
1040        get_test_file("alltypes_plain.parquet")
1041            .read_to_end(&mut buf)
1042            .unwrap();
1043        let cursor = Bytes::from(buf);
1044        let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1045
1046        let test_file = get_test_file("alltypes_plain.parquet");
1047        let read_from_file = SerializedFileReader::new(test_file).unwrap();
1048
1049        let file_iter = read_from_file.get_row_iter(None).unwrap();
1050        let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1051
1052        for (a, b) in file_iter.zip(cursor_iter) {
1053            assert_eq!(a.unwrap(), b.unwrap())
1054        }
1055    }
1056
1057    #[test]
1058    fn test_file_reader_try_from() {
1059        // Valid file path
1060        let test_file = get_test_file("alltypes_plain.parquet");
1061        let test_path_buf = get_test_path("alltypes_plain.parquet");
1062        let test_path = test_path_buf.as_path();
1063        let test_path_str = test_path.to_str().unwrap();
1064
1065        let reader = SerializedFileReader::try_from(test_file);
1066        assert!(reader.is_ok());
1067
1068        let reader = SerializedFileReader::try_from(test_path);
1069        assert!(reader.is_ok());
1070
1071        let reader = SerializedFileReader::try_from(test_path_str);
1072        assert!(reader.is_ok());
1073
1074        let reader = SerializedFileReader::try_from(test_path_str.to_string());
1075        assert!(reader.is_ok());
1076
1077        // Invalid file path
1078        let test_path = Path::new("invalid.parquet");
1079        let test_path_str = test_path.to_str().unwrap();
1080
1081        let reader = SerializedFileReader::try_from(test_path);
1082        assert!(reader.is_err());
1083
1084        let reader = SerializedFileReader::try_from(test_path_str);
1085        assert!(reader.is_err());
1086
1087        let reader = SerializedFileReader::try_from(test_path_str.to_string());
1088        assert!(reader.is_err());
1089    }
1090
1091    #[test]
1092    fn test_file_reader_into_iter() {
1093        let path = get_test_path("alltypes_plain.parquet");
1094        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1095        let iter = reader.into_iter();
1096        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1097
1098        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1099    }
1100
1101    #[test]
1102    fn test_file_reader_into_iter_project() {
1103        let path = get_test_path("alltypes_plain.parquet");
1104        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1105        let schema = "message schema { OPTIONAL INT32 id; }";
1106        let proj = parse_message_type(schema).ok();
1107        let iter = reader.into_iter().project(proj).unwrap();
1108        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1109
1110        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1111    }
1112
1113    #[test]
1114    fn test_reuse_file_chunk() {
1115        // This test covers the case of maintaining the correct start position in a file
1116        // stream for each column reader after initializing and moving to the next one
1117        // (without necessarily reading the entire column).
1118        let test_file = get_test_file("alltypes_plain.parquet");
1119        let reader = SerializedFileReader::new(test_file).unwrap();
1120        let row_group = reader.get_row_group(0).unwrap();
1121
1122        let mut page_readers = Vec::new();
1123        for i in 0..row_group.num_columns() {
1124            page_readers.push(row_group.get_column_page_reader(i).unwrap());
1125        }
1126
1127        // Now buffer each col reader, we do not expect any failures like:
1128        // General("underlying Thrift error: end of file")
1129        for mut page_reader in page_readers {
1130            assert!(page_reader.get_next_page().is_ok());
1131        }
1132    }
1133
1134    #[test]
1135    fn test_file_reader() {
1136        let test_file = get_test_file("alltypes_plain.parquet");
1137        let reader_result = SerializedFileReader::new(test_file);
1138        assert!(reader_result.is_ok());
1139        let reader = reader_result.unwrap();
1140
1141        // Test contents in Parquet metadata
1142        let metadata = reader.metadata();
1143        assert_eq!(metadata.num_row_groups(), 1);
1144
1145        // Test contents in file metadata
1146        let file_metadata = metadata.file_metadata();
1147        assert!(file_metadata.created_by().is_some());
1148        assert_eq!(
1149            file_metadata.created_by().unwrap(),
1150            "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1151        );
1152        assert!(file_metadata.key_value_metadata().is_none());
1153        assert_eq!(file_metadata.num_rows(), 8);
1154        assert_eq!(file_metadata.version(), 1);
1155        assert_eq!(file_metadata.column_orders(), None);
1156
1157        // Test contents in row group metadata
1158        let row_group_metadata = metadata.row_group(0);
1159        assert_eq!(row_group_metadata.num_columns(), 11);
1160        assert_eq!(row_group_metadata.num_rows(), 8);
1161        assert_eq!(row_group_metadata.total_byte_size(), 671);
1162        // Check each column order
1163        for i in 0..row_group_metadata.num_columns() {
1164            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1165        }
1166
1167        // Test row group reader
1168        let row_group_reader_result = reader.get_row_group(0);
1169        assert!(row_group_reader_result.is_ok());
1170        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1171        assert_eq!(
1172            row_group_reader.num_columns(),
1173            row_group_metadata.num_columns()
1174        );
1175        assert_eq!(
1176            row_group_reader.metadata().total_byte_size(),
1177            row_group_metadata.total_byte_size()
1178        );
1179
1180        // Test page readers
1181        // TODO: test for every column
1182        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1183        assert!(page_reader_0_result.is_ok());
1184        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1185        let mut page_count = 0;
1186        while let Ok(Some(page)) = page_reader_0.get_next_page() {
1187            let is_expected_page = match page {
1188                Page::DictionaryPage {
1189                    buf,
1190                    num_values,
1191                    encoding,
1192                    is_sorted,
1193                } => {
1194                    assert_eq!(buf.len(), 32);
1195                    assert_eq!(num_values, 8);
1196                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1197                    assert!(!is_sorted);
1198                    true
1199                }
1200                Page::DataPage {
1201                    buf,
1202                    num_values,
1203                    encoding,
1204                    def_level_encoding,
1205                    rep_level_encoding,
1206                    statistics,
1207                } => {
1208                    assert_eq!(buf.len(), 11);
1209                    assert_eq!(num_values, 8);
1210                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1211                    assert_eq!(def_level_encoding, Encoding::RLE);
1212                    #[allow(deprecated)]
1213                    let expected_rep_level_encoding = Encoding::BIT_PACKED;
1214                    assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1215                    assert!(statistics.is_none());
1216                    true
1217                }
1218                _ => false,
1219            };
1220            assert!(is_expected_page);
1221            page_count += 1;
1222        }
1223        assert_eq!(page_count, 2);
1224    }
1225
1226    #[test]
1227    fn test_file_reader_datapage_v2() {
1228        let test_file = get_test_file("datapage_v2.snappy.parquet");
1229        let reader_result = SerializedFileReader::new(test_file);
1230        assert!(reader_result.is_ok());
1231        let reader = reader_result.unwrap();
1232
1233        // Test contents in Parquet metadata
1234        let metadata = reader.metadata();
1235        assert_eq!(metadata.num_row_groups(), 1);
1236
1237        // Test contents in file metadata
1238        let file_metadata = metadata.file_metadata();
1239        assert!(file_metadata.created_by().is_some());
1240        assert_eq!(
1241            file_metadata.created_by().unwrap(),
1242            "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1243        );
1244        assert!(file_metadata.key_value_metadata().is_some());
1245        assert_eq!(
1246            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1247            1
1248        );
1249
1250        assert_eq!(file_metadata.num_rows(), 5);
1251        assert_eq!(file_metadata.version(), 1);
1252        assert_eq!(file_metadata.column_orders(), None);
1253
1254        let row_group_metadata = metadata.row_group(0);
1255
1256        // Check each column order
1257        for i in 0..row_group_metadata.num_columns() {
1258            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1259        }
1260
1261        // Test row group reader
1262        let row_group_reader_result = reader.get_row_group(0);
1263        assert!(row_group_reader_result.is_ok());
1264        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1265        assert_eq!(
1266            row_group_reader.num_columns(),
1267            row_group_metadata.num_columns()
1268        );
1269        assert_eq!(
1270            row_group_reader.metadata().total_byte_size(),
1271            row_group_metadata.total_byte_size()
1272        );
1273
1274        // Test page readers
1275        // TODO: test for every column
1276        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1277        assert!(page_reader_0_result.is_ok());
1278        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1279        let mut page_count = 0;
1280        while let Ok(Some(page)) = page_reader_0.get_next_page() {
1281            let is_expected_page = match page {
1282                Page::DictionaryPage {
1283                    buf,
1284                    num_values,
1285                    encoding,
1286                    is_sorted,
1287                } => {
1288                    assert_eq!(buf.len(), 7);
1289                    assert_eq!(num_values, 1);
1290                    assert_eq!(encoding, Encoding::PLAIN);
1291                    assert!(!is_sorted);
1292                    true
1293                }
1294                Page::DataPageV2 {
1295                    buf,
1296                    num_values,
1297                    encoding,
1298                    num_nulls,
1299                    num_rows,
1300                    def_levels_byte_len,
1301                    rep_levels_byte_len,
1302                    is_compressed,
1303                    statistics,
1304                } => {
1305                    assert_eq!(buf.len(), 4);
1306                    assert_eq!(num_values, 5);
1307                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1308                    assert_eq!(num_nulls, 1);
1309                    assert_eq!(num_rows, 5);
1310                    assert_eq!(def_levels_byte_len, 2);
1311                    assert_eq!(rep_levels_byte_len, 0);
1312                    assert!(is_compressed);
1313                    assert!(statistics.is_some());
1314                    true
1315                }
1316                _ => false,
1317            };
1318            assert!(is_expected_page);
1319            page_count += 1;
1320        }
1321        assert_eq!(page_count, 2);
1322    }
1323
1324    fn get_serialized_page_reader<R: ChunkReader>(
1325        file_reader: &SerializedFileReader<R>,
1326        row_group: usize,
1327        column: usize,
1328    ) -> Result<SerializedPageReader<R>> {
1329        let row_group = {
1330            let row_group_metadata = file_reader.metadata.row_group(row_group);
1331            let props = Arc::clone(&file_reader.props);
1332            let f = Arc::clone(&file_reader.chunk_reader);
1333            SerializedRowGroupReader::new(
1334                f,
1335                row_group_metadata,
1336                file_reader
1337                    .metadata
1338                    .offset_index()
1339                    .map(|x| x[row_group].as_slice()),
1340                props,
1341            )?
1342        };
1343
1344        let col = row_group.metadata.column(column);
1345
1346        let page_locations = row_group
1347            .offset_index
1348            .map(|x| x[column].page_locations.clone());
1349
1350        let props = Arc::clone(&row_group.props);
1351        SerializedPageReader::new_with_properties(
1352            Arc::clone(&row_group.chunk_reader),
1353            col,
1354            row_group.metadata.num_rows() as usize,
1355            page_locations,
1356            props,
1357        )
1358    }
1359
1360    #[test]
1361    fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1362        let test_file = get_test_file("alltypes_plain.parquet");
1363        let reader = SerializedFileReader::new(test_file)?;
1364
1365        let mut offset_set = HashSet::new();
1366        let num_row_groups = reader.metadata.num_row_groups();
1367        for row_group in 0..num_row_groups {
1368            let num_columns = reader.metadata.row_group(row_group).num_columns();
1369            for column in 0..num_columns {
1370                let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1371
1372                while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1373                    match &page_reader.state {
1374                        SerializedPageReaderState::Pages {
1375                            page_locations,
1376                            dictionary_page,
1377                            ..
1378                        } => {
1379                            if let Some(page) = dictionary_page {
1380                                assert_eq!(page.offset as usize, page_offset);
1381                            } else if let Some(page) = page_locations.front() {
1382                                assert_eq!(page.offset as usize, page_offset);
1383                            } else {
1384                                unreachable!()
1385                            }
1386                        }
1387                        SerializedPageReaderState::Values {
1388                            offset,
1389                            next_page_header,
1390                            ..
1391                        } => {
1392                            assert!(next_page_header.is_some());
1393                            assert_eq!(*offset, page_offset);
1394                        }
1395                    }
1396                    let page = page_reader.get_next_page()?;
1397                    assert!(page.is_some());
1398                    let newly_inserted = offset_set.insert(page_offset);
1399                    assert!(newly_inserted);
1400                }
1401            }
1402        }
1403
1404        Ok(())
1405    }
1406
1407    #[test]
1408    fn test_page_iterator() {
1409        let file = get_test_file("alltypes_plain.parquet");
1410        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1411
1412        let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1413
1414        // read first page
1415        let page = page_iterator.next();
1416        assert!(page.is_some());
1417        assert!(page.unwrap().is_ok());
1418
1419        // reach end of file
1420        let page = page_iterator.next();
1421        assert!(page.is_none());
1422
1423        let row_group_indices = Box::new(0..1);
1424        let mut page_iterator =
1425            FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1426
1427        // read first page
1428        let page = page_iterator.next();
1429        assert!(page.is_some());
1430        assert!(page.unwrap().is_ok());
1431
1432        // reach end of file
1433        let page = page_iterator.next();
1434        assert!(page.is_none());
1435    }
1436
1437    #[test]
1438    fn test_file_reader_key_value_metadata() {
1439        let file = get_test_file("binary.parquet");
1440        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1441
1442        let metadata = file_reader
1443            .metadata
1444            .file_metadata()
1445            .key_value_metadata()
1446            .unwrap();
1447
1448        assert_eq!(metadata.len(), 3);
1449
1450        assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1451
1452        assert_eq!(metadata[1].key, "writer.model.name");
1453        assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1454
1455        assert_eq!(metadata[2].key, "parquet.proto.class");
1456        assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1457    }
1458
1459    #[test]
1460    fn test_file_reader_optional_metadata() {
1461        // file with optional metadata: bloom filters, encoding stats, column index and offset index.
1462        let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1463        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1464
1465        let row_group_metadata = file_reader.metadata.row_group(0);
1466        let col0_metadata = row_group_metadata.column(0);
1467
1468        // test optional bloom filter offset
1469        assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1470
1471        // test page encoding stats
1472        let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1473
1474        assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1475        assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1476        assert_eq!(page_encoding_stats.count, 1);
1477
1478        // test optional column index offset
1479        assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1480        assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1481
1482        // test optional offset index offset
1483        assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1484        assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1485    }
1486
1487    #[test]
1488    fn test_file_reader_with_no_filter() -> Result<()> {
1489        let test_file = get_test_file("alltypes_plain.parquet");
1490        let origin_reader = SerializedFileReader::new(test_file)?;
1491        // test initial number of row groups
1492        let metadata = origin_reader.metadata();
1493        assert_eq!(metadata.num_row_groups(), 1);
1494        Ok(())
1495    }
1496
1497    #[test]
1498    fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
1499        let test_file = get_test_file("alltypes_plain.parquet");
1500        let read_options = ReadOptionsBuilder::new()
1501            .with_predicate(Box::new(|_, _| false))
1502            .build();
1503        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1504        let metadata = reader.metadata();
1505        assert_eq!(metadata.num_row_groups(), 0);
1506        Ok(())
1507    }
1508
1509    #[test]
1510    fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
1511        let test_file = get_test_file("alltypes_plain.parquet");
1512        let origin_reader = SerializedFileReader::new(test_file)?;
1513        // test initial number of row groups
1514        let metadata = origin_reader.metadata();
1515        assert_eq!(metadata.num_row_groups(), 1);
1516        let mid = get_midpoint_offset(metadata.row_group(0));
1517
1518        let test_file = get_test_file("alltypes_plain.parquet");
1519        let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
1520        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1521        let metadata = reader.metadata();
1522        assert_eq!(metadata.num_row_groups(), 1);
1523
1524        let test_file = get_test_file("alltypes_plain.parquet");
1525        let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
1526        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1527        let metadata = reader.metadata();
1528        assert_eq!(metadata.num_row_groups(), 0);
1529        Ok(())
1530    }
1531
1532    #[test]
1533    fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
1534        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1535        let origin_reader = SerializedFileReader::new(test_file)?;
1536        let metadata = origin_reader.metadata();
1537        let mid = get_midpoint_offset(metadata.row_group(0));
1538
1539        // true, true predicate
1540        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1541        let read_options = ReadOptionsBuilder::new()
1542            .with_page_index()
1543            .with_predicate(Box::new(|_, _| true))
1544            .with_range(mid, mid + 1)
1545            .build();
1546        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1547        let metadata = reader.metadata();
1548        assert_eq!(metadata.num_row_groups(), 1);
1549        assert_eq!(metadata.column_index().unwrap().len(), 1);
1550        assert_eq!(metadata.offset_index().unwrap().len(), 1);
1551
1552        // true, false predicate
1553        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1554        let read_options = ReadOptionsBuilder::new()
1555            .with_page_index()
1556            .with_predicate(Box::new(|_, _| true))
1557            .with_range(0, mid)
1558            .build();
1559        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1560        let metadata = reader.metadata();
1561        assert_eq!(metadata.num_row_groups(), 0);
1562        assert!(metadata.column_index().is_none());
1563        assert!(metadata.offset_index().is_none());
1564
1565        // false, true predicate
1566        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1567        let read_options = ReadOptionsBuilder::new()
1568            .with_page_index()
1569            .with_predicate(Box::new(|_, _| false))
1570            .with_range(mid, mid + 1)
1571            .build();
1572        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1573        let metadata = reader.metadata();
1574        assert_eq!(metadata.num_row_groups(), 0);
1575        assert!(metadata.column_index().is_none());
1576        assert!(metadata.offset_index().is_none());
1577
1578        // false, false predicate
1579        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1580        let read_options = ReadOptionsBuilder::new()
1581            .with_page_index()
1582            .with_predicate(Box::new(|_, _| false))
1583            .with_range(0, mid)
1584            .build();
1585        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1586        let metadata = reader.metadata();
1587        assert_eq!(metadata.num_row_groups(), 0);
1588        assert!(metadata.column_index().is_none());
1589        assert!(metadata.offset_index().is_none());
1590        Ok(())
1591    }
1592
1593    #[test]
1594    fn test_file_reader_invalid_metadata() {
1595        let data = [
1596            255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
1597            80, 65, 82, 49,
1598        ];
1599        let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
1600        assert_eq!(
1601            ret.err().unwrap().to_string(),
1602            "Parquet error: Could not parse metadata: bad data"
1603        );
1604    }
1605
1606    #[test]
1607    // Use java parquet-tools get below pageIndex info
1608    // !```
1609    // parquet-tools column-index ./data_index_bloom_encoding_stats.parquet
1610    // row group 0:
1611    // column index for column String:
1612    // Boundary order: ASCENDING
1613    // page-0  :
1614    // null count                 min                                  max
1615    // 0                          Hello                                today
1616    //
1617    // offset index for column String:
1618    // page-0   :
1619    // offset   compressed size       first row index
1620    // 4               152                     0
1621    ///```
1622    //
1623    fn test_page_index_reader() {
1624        let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
1625        let builder = ReadOptionsBuilder::new();
1626        //enable read page index
1627        let options = builder.with_page_index().build();
1628        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1629        let reader = reader_result.unwrap();
1630
1631        // Test contents in Parquet metadata
1632        let metadata = reader.metadata();
1633        assert_eq!(metadata.num_row_groups(), 1);
1634
1635        let column_index = metadata.column_index().unwrap();
1636
1637        // only one row group
1638        assert_eq!(column_index.len(), 1);
1639        let index = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
1640            index
1641        } else {
1642            unreachable!()
1643        };
1644
1645        assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
1646        let index_in_pages = &index.indexes;
1647
1648        //only one page group
1649        assert_eq!(index_in_pages.len(), 1);
1650
1651        let page0 = &index_in_pages[0];
1652        let min = page0.min.as_ref().unwrap();
1653        let max = page0.max.as_ref().unwrap();
1654        assert_eq!(b"Hello", min.as_bytes());
1655        assert_eq!(b"today", max.as_bytes());
1656
1657        let offset_indexes = metadata.offset_index().unwrap();
1658        // only one row group
1659        assert_eq!(offset_indexes.len(), 1);
1660        let offset_index = &offset_indexes[0];
1661        let page_offset = &offset_index[0].page_locations()[0];
1662
1663        assert_eq!(4, page_offset.offset);
1664        assert_eq!(152, page_offset.compressed_page_size);
1665        assert_eq!(0, page_offset.first_row_index);
1666    }
1667
1668    #[test]
1669    fn test_page_index_reader_out_of_order() {
1670        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1671        let options = ReadOptionsBuilder::new().with_page_index().build();
1672        let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
1673        let metadata = reader.metadata();
1674
1675        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1676        let columns = metadata.row_group(0).columns();
1677        let reversed: Vec<_> = columns.iter().cloned().rev().collect();
1678
1679        let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
1680        let mut b = read_columns_indexes(&test_file, &reversed)
1681            .unwrap()
1682            .unwrap();
1683        b.reverse();
1684        assert_eq!(a, b);
1685
1686        let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
1687        let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
1688        b.reverse();
1689        assert_eq!(a, b);
1690    }
1691
1692    #[test]
1693    fn test_page_index_reader_all_type() {
1694        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1695        let builder = ReadOptionsBuilder::new();
1696        //enable read page index
1697        let options = builder.with_page_index().build();
1698        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1699        let reader = reader_result.unwrap();
1700
1701        // Test contents in Parquet metadata
1702        let metadata = reader.metadata();
1703        assert_eq!(metadata.num_row_groups(), 1);
1704
1705        let column_index = metadata.column_index().unwrap();
1706        let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
1707
1708        // only one row group
1709        assert_eq!(column_index.len(), 1);
1710        let row_group_metadata = metadata.row_group(0);
1711
1712        //col0->id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 7299, num_nulls: 0]
1713        assert!(!&column_index[0][0].is_sorted());
1714        let boundary_order = &column_index[0][0].get_boundary_order();
1715        assert!(boundary_order.is_some());
1716        matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
1717        if let Index::INT32(index) = &column_index[0][0] {
1718            check_native_page_index(
1719                index,
1720                325,
1721                get_row_group_min_max_bytes(row_group_metadata, 0),
1722                BoundaryOrder::UNORDERED,
1723            );
1724            assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
1725        } else {
1726            unreachable!()
1727        };
1728        //col1->bool_col:BOOLEAN UNCOMPRESSED DO:0 FPO:37329 SZ:3022/3022/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: false, max: true, num_nulls: 0]
1729        assert!(&column_index[0][1].is_sorted());
1730        if let Index::BOOLEAN(index) = &column_index[0][1] {
1731            assert_eq!(index.indexes.len(), 82);
1732            assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
1733        } else {
1734            unreachable!()
1735        };
1736        //col2->tinyint_col: INT32 UNCOMPRESSED DO:0 FPO:40351 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
1737        assert!(&column_index[0][2].is_sorted());
1738        if let Index::INT32(index) = &column_index[0][2] {
1739            check_native_page_index(
1740                index,
1741                325,
1742                get_row_group_min_max_bytes(row_group_metadata, 2),
1743                BoundaryOrder::ASCENDING,
1744            );
1745            assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
1746        } else {
1747            unreachable!()
1748        };
1749        //col4->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
1750        assert!(&column_index[0][3].is_sorted());
1751        if let Index::INT32(index) = &column_index[0][3] {
1752            check_native_page_index(
1753                index,
1754                325,
1755                get_row_group_min_max_bytes(row_group_metadata, 3),
1756                BoundaryOrder::ASCENDING,
1757            );
1758            assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
1759        } else {
1760            unreachable!()
1761        };
1762        //col5->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
1763        assert!(&column_index[0][4].is_sorted());
1764        if let Index::INT32(index) = &column_index[0][4] {
1765            check_native_page_index(
1766                index,
1767                325,
1768                get_row_group_min_max_bytes(row_group_metadata, 4),
1769                BoundaryOrder::ASCENDING,
1770            );
1771            assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
1772        } else {
1773            unreachable!()
1774        };
1775        //col6->bigint_col: INT64 UNCOMPRESSED DO:0 FPO:152326 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 90, num_nulls: 0]
1776        assert!(!&column_index[0][5].is_sorted());
1777        if let Index::INT64(index) = &column_index[0][5] {
1778            check_native_page_index(
1779                index,
1780                528,
1781                get_row_group_min_max_bytes(row_group_metadata, 5),
1782                BoundaryOrder::UNORDERED,
1783            );
1784            assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
1785        } else {
1786            unreachable!()
1787        };
1788        //col7->float_col: FLOAT UNCOMPRESSED DO:0 FPO:223924 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 9.9, num_nulls: 0]
1789        assert!(&column_index[0][6].is_sorted());
1790        if let Index::FLOAT(index) = &column_index[0][6] {
1791            check_native_page_index(
1792                index,
1793                325,
1794                get_row_group_min_max_bytes(row_group_metadata, 6),
1795                BoundaryOrder::ASCENDING,
1796            );
1797            assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
1798        } else {
1799            unreachable!()
1800        };
1801        //col8->double_col: DOUBLE UNCOMPRESSED DO:0 FPO:261249 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 90.89999999999999, num_nulls: 0]
1802        assert!(!&column_index[0][7].is_sorted());
1803        if let Index::DOUBLE(index) = &column_index[0][7] {
1804            check_native_page_index(
1805                index,
1806                528,
1807                get_row_group_min_max_bytes(row_group_metadata, 7),
1808                BoundaryOrder::UNORDERED,
1809            );
1810            assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
1811        } else {
1812            unreachable!()
1813        };
1814        //col9->date_string_col: BINARY UNCOMPRESSED DO:0 FPO:332847 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 01/01/09, max: 12/31/10, num_nulls: 0]
1815        assert!(!&column_index[0][8].is_sorted());
1816        if let Index::BYTE_ARRAY(index) = &column_index[0][8] {
1817            check_native_page_index(
1818                index,
1819                974,
1820                get_row_group_min_max_bytes(row_group_metadata, 8),
1821                BoundaryOrder::UNORDERED,
1822            );
1823            assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
1824        } else {
1825            unreachable!()
1826        };
1827        //col10->string_col: BINARY UNCOMPRESSED DO:0 FPO:444795 SZ:45298/45298/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
1828        assert!(&column_index[0][9].is_sorted());
1829        if let Index::BYTE_ARRAY(index) = &column_index[0][9] {
1830            check_native_page_index(
1831                index,
1832                352,
1833                get_row_group_min_max_bytes(row_group_metadata, 9),
1834                BoundaryOrder::ASCENDING,
1835            );
1836            assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
1837        } else {
1838            unreachable!()
1839        };
1840        //col11->timestamp_col: INT96 UNCOMPRESSED DO:0 FPO:490093 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[num_nulls: 0, min/max not defined]
1841        //Notice: min_max values for each page for this col not exits.
1842        assert!(!&column_index[0][10].is_sorted());
1843        if let Index::NONE = &column_index[0][10] {
1844            assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
1845        } else {
1846            unreachable!()
1847        };
1848        //col12->year: INT32 UNCOMPRESSED DO:0 FPO:602041 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2009, max: 2010, num_nulls: 0]
1849        assert!(&column_index[0][11].is_sorted());
1850        if let Index::INT32(index) = &column_index[0][11] {
1851            check_native_page_index(
1852                index,
1853                325,
1854                get_row_group_min_max_bytes(row_group_metadata, 11),
1855                BoundaryOrder::ASCENDING,
1856            );
1857            assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
1858        } else {
1859            unreachable!()
1860        };
1861        //col13->month: INT32 UNCOMPRESSED DO:0 FPO:639366 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 12, num_nulls: 0]
1862        assert!(!&column_index[0][12].is_sorted());
1863        if let Index::INT32(index) = &column_index[0][12] {
1864            check_native_page_index(
1865                index,
1866                325,
1867                get_row_group_min_max_bytes(row_group_metadata, 12),
1868                BoundaryOrder::UNORDERED,
1869            );
1870            assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
1871        } else {
1872            unreachable!()
1873        };
1874    }
1875
1876    fn check_native_page_index<T: ParquetValueType>(
1877        row_group_index: &NativeIndex<T>,
1878        page_size: usize,
1879        min_max: (&[u8], &[u8]),
1880        boundary_order: BoundaryOrder,
1881    ) {
1882        assert_eq!(row_group_index.indexes.len(), page_size);
1883        assert_eq!(row_group_index.boundary_order, boundary_order);
1884        row_group_index.indexes.iter().all(|x| {
1885            x.min.as_ref().unwrap() >= &T::try_from_le_slice(min_max.0).unwrap()
1886                && x.max.as_ref().unwrap() <= &T::try_from_le_slice(min_max.1).unwrap()
1887        });
1888    }
1889
1890    fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
1891        let statistics = r.column(col_num).statistics().unwrap();
1892        (
1893            statistics.min_bytes_opt().unwrap_or_default(),
1894            statistics.max_bytes_opt().unwrap_or_default(),
1895        )
1896    }
1897
1898    #[test]
1899    fn test_skip_page_with_offset_index() {
1900        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1901        let builder = ReadOptionsBuilder::new();
1902        //enable read page index
1903        let options = builder.with_page_index().build();
1904        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1905        let reader = reader_result.unwrap();
1906
1907        let row_group_reader = reader.get_row_group(0).unwrap();
1908
1909        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
1910        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
1911
1912        let mut vec = vec![];
1913
1914        for i in 0..325 {
1915            if i % 2 == 0 {
1916                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
1917            } else {
1918                column_page_reader.skip_next_page().unwrap();
1919            }
1920        }
1921        //check read all pages.
1922        assert!(column_page_reader.peek_next_page().unwrap().is_none());
1923        assert!(column_page_reader.get_next_page().unwrap().is_none());
1924
1925        assert_eq!(vec.len(), 163);
1926    }
1927
1928    #[test]
1929    fn test_skip_page_without_offset_index() {
1930        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1931
1932        // use default SerializedFileReader without read offsetIndex
1933        let reader_result = SerializedFileReader::new(test_file);
1934        let reader = reader_result.unwrap();
1935
1936        let row_group_reader = reader.get_row_group(0).unwrap();
1937
1938        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
1939        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
1940
1941        let mut vec = vec![];
1942
1943        for i in 0..325 {
1944            if i % 2 == 0 {
1945                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
1946            } else {
1947                column_page_reader.peek_next_page().unwrap().unwrap();
1948                column_page_reader.skip_next_page().unwrap();
1949            }
1950        }
1951        //check read all pages.
1952        assert!(column_page_reader.peek_next_page().unwrap().is_none());
1953        assert!(column_page_reader.get_next_page().unwrap().is_none());
1954
1955        assert_eq!(vec.len(), 163);
1956    }
1957
1958    #[test]
1959    fn test_peek_page_with_dictionary_page() {
1960        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1961        let builder = ReadOptionsBuilder::new();
1962        //enable read page index
1963        let options = builder.with_page_index().build();
1964        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1965        let reader = reader_result.unwrap();
1966        let row_group_reader = reader.get_row_group(0).unwrap();
1967
1968        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
1969        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
1970
1971        let mut vec = vec![];
1972
1973        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
1974        assert!(meta.is_dict);
1975        let page = column_page_reader.get_next_page().unwrap().unwrap();
1976        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
1977
1978        for i in 0..352 {
1979            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
1980            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
1981            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
1982            if i != 351 {
1983                assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
1984            } else {
1985                // last page first row index is 7290, total row count is 7300
1986                // because first row start with zero, last page row count should be 10.
1987                assert_eq!(meta.num_rows, Some(10));
1988            }
1989            assert!(!meta.is_dict);
1990            vec.push(meta);
1991            let page = column_page_reader.get_next_page().unwrap().unwrap();
1992            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
1993        }
1994
1995        //check read all pages.
1996        assert!(column_page_reader.peek_next_page().unwrap().is_none());
1997        assert!(column_page_reader.get_next_page().unwrap().is_none());
1998
1999        assert_eq!(vec.len(), 352);
2000    }
2001
2002    #[test]
2003    fn test_peek_page_with_dictionary_page_without_offset_index() {
2004        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2005
2006        let reader_result = SerializedFileReader::new(test_file);
2007        let reader = reader_result.unwrap();
2008        let row_group_reader = reader.get_row_group(0).unwrap();
2009
2010        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
2011        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2012
2013        let mut vec = vec![];
2014
2015        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2016        assert!(meta.is_dict);
2017        let page = column_page_reader.get_next_page().unwrap().unwrap();
2018        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2019
2020        for i in 0..352 {
2021            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2022            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
2023            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
2024            if i != 351 {
2025                assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2026            } else {
2027                // last page first row index is 7290, total row count is 7300
2028                // because first row start with zero, last page row count should be 10.
2029                assert_eq!(meta.num_levels, Some(10));
2030            }
2031            assert!(!meta.is_dict);
2032            vec.push(meta);
2033            let page = column_page_reader.get_next_page().unwrap().unwrap();
2034            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2035        }
2036
2037        //check read all pages.
2038        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2039        assert!(column_page_reader.get_next_page().unwrap().is_none());
2040
2041        assert_eq!(vec.len(), 352);
2042    }
2043
2044    #[test]
2045    fn test_fixed_length_index() {
2046        let message_type = "
2047        message test_schema {
2048          OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2049        }
2050        ";
2051
2052        let schema = parse_message_type(message_type).unwrap();
2053        let mut out = Vec::with_capacity(1024);
2054        let mut writer =
2055            SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2056
2057        let mut r = writer.next_row_group().unwrap();
2058        let mut c = r.next_column().unwrap().unwrap();
2059        c.typed::<FixedLenByteArrayType>()
2060            .write_batch(
2061                &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2062                Some(&[1, 1, 0, 1]),
2063                None,
2064            )
2065            .unwrap();
2066        c.close().unwrap();
2067        r.close().unwrap();
2068        writer.close().unwrap();
2069
2070        let b = Bytes::from(out);
2071        let options = ReadOptionsBuilder::new().with_page_index().build();
2072        let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2073        let index = reader.metadata().column_index().unwrap();
2074
2075        // 1 row group
2076        assert_eq!(index.len(), 1);
2077        let c = &index[0];
2078        // 1 column
2079        assert_eq!(c.len(), 1);
2080
2081        match &c[0] {
2082            Index::FIXED_LEN_BYTE_ARRAY(v) => {
2083                assert_eq!(v.indexes.len(), 1);
2084                let page_idx = &v.indexes[0];
2085                assert_eq!(page_idx.null_count.unwrap(), 1);
2086                assert_eq!(page_idx.min.as_ref().unwrap().as_ref(), &[0; 11]);
2087                assert_eq!(page_idx.max.as_ref().unwrap().as_ref(), &[5; 11]);
2088            }
2089            _ => unreachable!(),
2090        }
2091    }
2092
2093    #[test]
2094    fn test_multi_gz() {
2095        let file = get_test_file("concatenated_gzip_members.parquet");
2096        let reader = SerializedFileReader::new(file).unwrap();
2097        let row_group_reader = reader.get_row_group(0).unwrap();
2098        match row_group_reader.get_column_reader(0).unwrap() {
2099            ColumnReader::Int64ColumnReader(mut reader) => {
2100                let mut buffer = Vec::with_capacity(1024);
2101                let mut def_levels = Vec::with_capacity(1024);
2102                let (num_records, num_values, num_levels) = reader
2103                    .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2104                    .unwrap();
2105
2106                assert_eq!(num_records, 513);
2107                assert_eq!(num_values, 513);
2108                assert_eq!(num_levels, 513);
2109
2110                let expected: Vec<i64> = (1..514).collect();
2111                assert_eq!(&buffer, &expected);
2112            }
2113            _ => unreachable!(),
2114        }
2115    }
2116
2117    #[test]
2118    fn test_byte_stream_split_extended() {
2119        let path = format!(
2120            "{}/byte_stream_split_extended.gzip.parquet",
2121            arrow::util::test_util::parquet_test_data(),
2122        );
2123        let file = File::open(path).unwrap();
2124        let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2125
2126        // Use full schema as projected schema
2127        let mut iter = reader
2128            .get_row_iter(None)
2129            .expect("Failed to create row iterator");
2130
2131        let mut start = 0;
2132        let end = reader.metadata().file_metadata().num_rows();
2133
2134        let check_row = |row: Result<Row, ParquetError>| {
2135            assert!(row.is_ok());
2136            let r = row.unwrap();
2137            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2138            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2139            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2140            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2141            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2142            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2143            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2144        };
2145
2146        while start < end {
2147            match iter.next() {
2148                Some(row) => check_row(row),
2149                None => break,
2150            };
2151            start += 1;
2152        }
2153    }
2154
2155    #[test]
2156    fn test_filtered_rowgroup_metadata() {
2157        let message_type = "
2158            message test_schema {
2159                REQUIRED INT32 a;
2160            }
2161        ";
2162        let schema = Arc::new(parse_message_type(message_type).unwrap());
2163        let props = Arc::new(
2164            WriterProperties::builder()
2165                .set_statistics_enabled(EnabledStatistics::Page)
2166                .build(),
2167        );
2168        let mut file: File = tempfile::tempfile().unwrap();
2169        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2170        let data = [1, 2, 3, 4, 5];
2171
2172        // write 5 row groups
2173        for idx in 0..5 {
2174            let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2175            let mut row_group_writer = file_writer.next_row_group().unwrap();
2176            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2177                writer
2178                    .typed::<Int32Type>()
2179                    .write_batch(data_i.as_slice(), None, None)
2180                    .unwrap();
2181                writer.close().unwrap();
2182            }
2183            row_group_writer.close().unwrap();
2184            file_writer.flushed_row_groups();
2185        }
2186        let file_metadata = file_writer.close().unwrap();
2187
2188        assert_eq!(file_metadata.num_rows, 25);
2189        assert_eq!(file_metadata.row_groups.len(), 5);
2190
2191        // read only the 3rd row group
2192        let read_options = ReadOptionsBuilder::new()
2193            .with_page_index()
2194            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2195            .build();
2196        let reader =
2197            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2198                .unwrap();
2199        let metadata = reader.metadata();
2200
2201        // check we got the expected row group
2202        assert_eq!(metadata.num_row_groups(), 1);
2203        assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2204
2205        // check we only got the relevant page indexes
2206        assert!(metadata.column_index().is_some());
2207        assert!(metadata.offset_index().is_some());
2208        assert_eq!(metadata.column_index().unwrap().len(), 1);
2209        assert_eq!(metadata.offset_index().unwrap().len(), 1);
2210        let col_idx = metadata.column_index().unwrap();
2211        let off_idx = metadata.offset_index().unwrap();
2212        let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2213        let pg_idx = &col_idx[0][0];
2214        let off_idx_i = &off_idx[0][0];
2215
2216        // test that we got the index matching the row group
2217        match pg_idx {
2218            Index::INT32(int_idx) => {
2219                let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2220                let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2221                assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2222                assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2223            }
2224            _ => panic!("wrong stats type"),
2225        }
2226
2227        // check offset index matches too
2228        assert_eq!(
2229            off_idx_i.page_locations[0].offset,
2230            metadata.row_group(0).column(0).data_page_offset()
2231        );
2232
2233        // read non-contiguous row groups
2234        let read_options = ReadOptionsBuilder::new()
2235            .with_page_index()
2236            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2237            .build();
2238        let reader =
2239            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2240                .unwrap();
2241        let metadata = reader.metadata();
2242
2243        // check we got the expected row groups
2244        assert_eq!(metadata.num_row_groups(), 2);
2245        assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2246        assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2247
2248        // check we only got the relevant page indexes
2249        assert!(metadata.column_index().is_some());
2250        assert!(metadata.offset_index().is_some());
2251        assert_eq!(metadata.column_index().unwrap().len(), 2);
2252        assert_eq!(metadata.offset_index().unwrap().len(), 2);
2253        let col_idx = metadata.column_index().unwrap();
2254        let off_idx = metadata.offset_index().unwrap();
2255
2256        for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2257            let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2258            let pg_idx = &col_idx_i[0];
2259            let off_idx_i = &off_idx[i][0];
2260
2261            // test that we got the index matching the row group
2262            match pg_idx {
2263                Index::INT32(int_idx) => {
2264                    let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2265                    let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2266                    assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2267                    assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2268                }
2269                _ => panic!("wrong stats type"),
2270            }
2271
2272            // check offset index matches too
2273            assert_eq!(
2274                off_idx_i.page_locations[0].offset,
2275                metadata.row_group(i).column(0).data_page_offset()
2276            );
2277        }
2278    }
2279}