Skip to main content

parquet/file/
serialized_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader
19//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM)
20
21use crate::basic::{PageType, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{Codec, create_codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{CryptoContext, read_and_decrypt};
27use crate::errors::{ParquetError, Result};
28use crate::file::metadata::thrift::PageHeader;
29use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
30use crate::file::statistics;
31use crate::file::{
32    metadata::*,
33    properties::{ReaderProperties, ReaderPropertiesPtr},
34    reader::*,
35};
36#[cfg(feature = "encryption")]
37use crate::parquet_thrift::ThriftSliceInputProtocol;
38use crate::parquet_thrift::{ReadThrift, ThriftReadInputProtocol};
39use crate::record::Row;
40use crate::record::reader::RowIter;
41use crate::schema::types::{SchemaDescPtr, Type as SchemaType};
42use bytes::Bytes;
43use std::collections::VecDeque;
44use std::{fs::File, io::Read, path::Path, sync::Arc};
45
46impl TryFrom<File> for SerializedFileReader<File> {
47    type Error = ParquetError;
48
49    fn try_from(file: File) -> Result<Self> {
50        Self::new(file)
51    }
52}
53
54impl TryFrom<&Path> for SerializedFileReader<File> {
55    type Error = ParquetError;
56
57    fn try_from(path: &Path) -> Result<Self> {
58        let file = File::open(path)?;
59        Self::try_from(file)
60    }
61}
62
63impl TryFrom<String> for SerializedFileReader<File> {
64    type Error = ParquetError;
65
66    fn try_from(path: String) -> Result<Self> {
67        Self::try_from(Path::new(&path))
68    }
69}
70
71impl TryFrom<&str> for SerializedFileReader<File> {
72    type Error = ParquetError;
73
74    fn try_from(path: &str) -> Result<Self> {
75        Self::try_from(Path::new(&path))
76    }
77}
78
79/// Conversion into a [`RowIter`]
80/// using the full file schema over all row groups.
81impl IntoIterator for SerializedFileReader<File> {
82    type Item = Result<Row>;
83    type IntoIter = RowIter<'static>;
84
85    fn into_iter(self) -> Self::IntoIter {
86        RowIter::from_file_into(Box::new(self))
87    }
88}
89
90// ----------------------------------------------------------------------
91// Implementations of file & row group readers
92
93/// A serialized implementation for Parquet [`FileReader`].
94pub struct SerializedFileReader<R: ChunkReader> {
95    chunk_reader: Arc<R>,
96    metadata: Arc<ParquetMetaData>,
97    props: ReaderPropertiesPtr,
98}
99
100/// A predicate for filtering row groups, invoked with the metadata and index
101/// of each row group in the file. Only row groups for which the predicate
102/// evaluates to `true` will be scanned
103pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
104
105/// A builder for [`ReadOptions`].
106/// For the predicates that are added to the builder,
107/// they will be chained using 'AND' to filter the row groups.
108#[derive(Default)]
109pub struct ReadOptionsBuilder {
110    predicates: Vec<ReadGroupPredicate>,
111    enable_page_index: bool,
112    props: Option<ReaderProperties>,
113    metadata_options: ParquetMetaDataOptions,
114}
115
116impl ReadOptionsBuilder {
117    /// New builder
118    pub fn new() -> Self {
119        Self::default()
120    }
121
122    /// Add a predicate on row group metadata to the reading option,
123    /// Filter only row groups that match the predicate criteria
124    pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
125        self.predicates.push(predicate);
126        self
127    }
128
129    /// Add a range predicate on filtering row groups if their midpoints are within
130    /// the Closed-Open range `[start..end) {x | start <= x < end}`
131    pub fn with_range(mut self, start: i64, end: i64) -> Self {
132        assert!(start < end);
133        let predicate = move |rg: &RowGroupMetaData, _: usize| {
134            let mid = get_midpoint_offset(rg);
135            mid >= start && mid < end
136        };
137        self.predicates.push(Box::new(predicate));
138        self
139    }
140
141    /// Enable reading the page index structures described in
142    /// "[Column Index] Layout to Support Page Skipping"
143    ///
144    /// [Column Index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
145    pub fn with_page_index(mut self) -> Self {
146        self.enable_page_index = true;
147        self
148    }
149
150    /// Set the [`ReaderProperties`] configuration.
151    pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
152        self.props = Some(properties);
153        self
154    }
155
156    /// Provide a Parquet schema to use when decoding the metadata. The schema in the Parquet
157    /// footer will be skipped.
158    pub fn with_parquet_schema(mut self, schema: SchemaDescPtr) -> Self {
159        self.metadata_options.set_schema(schema);
160        self
161    }
162
163    /// Set whether to convert the [`encoding_stats`] in the Parquet `ColumnMetaData` to a bitmask
164    /// (defaults to `false`).
165    ///
166    /// See [`ColumnChunkMetaData::page_encoding_stats_mask`] for an explanation of why this
167    /// might be desirable.
168    ///
169    /// [`encoding_stats`]:
170    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
171    pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
172        self.metadata_options.set_encoding_stats_as_mask(val);
173        self
174    }
175
176    /// Sets the decoding policy for [`encoding_stats`] in the Parquet `ColumnMetaData`.
177    ///
178    /// [`encoding_stats`]:
179    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L917
180    pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
181        self.metadata_options.set_encoding_stats_policy(policy);
182        self
183    }
184
185    /// Sets the decoding policy for [`statistics`] in the Parquet `ColumnMetaData`.
186    ///
187    /// [`statistics`]:
188    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L912
189    pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
190        self.metadata_options.set_column_stats_policy(policy);
191        self
192    }
193
194    /// Sets the decoding policy for [`size_statistics`] in the Parquet `ColumnMetaData`.
195    ///
196    /// [`size_statistics`]:
197    /// https://github.com/apache/parquet-format/blob/786142e26740487930ddc3ec5e39d780bd930907/src/main/thrift/parquet.thrift#L936
198    pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
199        self.metadata_options.set_size_stats_policy(policy);
200        self
201    }
202
203    /// Seal the builder and return the read options
204    pub fn build(self) -> ReadOptions {
205        let props = self
206            .props
207            .unwrap_or_else(|| ReaderProperties::builder().build());
208        ReadOptions {
209            predicates: self.predicates,
210            enable_page_index: self.enable_page_index,
211            props,
212            metadata_options: self.metadata_options,
213        }
214    }
215}
216
217/// A collection of options for reading a Parquet file.
218///
219/// Predicates are currently only supported on row group metadata.
220/// All predicates will be chained using 'AND' to filter the row groups.
221pub struct ReadOptions {
222    predicates: Vec<ReadGroupPredicate>,
223    enable_page_index: bool,
224    props: ReaderProperties,
225    metadata_options: ParquetMetaDataOptions,
226}
227
228impl<R: 'static + ChunkReader> SerializedFileReader<R> {
229    /// Creates file reader from a Parquet file.
230    /// Returns an error if the Parquet file does not exist or is corrupt.
231    pub fn new(chunk_reader: R) -> Result<Self> {
232        let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
233        let props = Arc::new(ReaderProperties::builder().build());
234        Ok(Self {
235            chunk_reader: Arc::new(chunk_reader),
236            metadata: Arc::new(metadata),
237            props,
238        })
239    }
240
241    /// Creates file reader from a Parquet file with read options.
242    /// Returns an error if the Parquet file does not exist or is corrupt.
243    #[allow(deprecated)]
244    pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
245        let mut metadata_builder = ParquetMetaDataReader::new()
246            .with_metadata_options(Some(options.metadata_options.clone()))
247            .parse_and_finish(&chunk_reader)?
248            .into_builder();
249        let mut predicates = options.predicates;
250
251        // Filter row groups based on the predicates
252        for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
253            let mut keep = true;
254            for predicate in &mut predicates {
255                if !predicate(&rg_meta, i) {
256                    keep = false;
257                    break;
258                }
259            }
260            if keep {
261                metadata_builder = metadata_builder.add_row_group(rg_meta);
262            }
263        }
264
265        let mut metadata = metadata_builder.build();
266
267        // If page indexes are desired, build them with the filtered set of row groups
268        if options.enable_page_index {
269            let mut reader =
270                ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
271            reader.read_page_indexes(&chunk_reader)?;
272            metadata = reader.finish()?;
273        }
274
275        Ok(Self {
276            chunk_reader: Arc::new(chunk_reader),
277            metadata: Arc::new(metadata),
278            props: Arc::new(options.props),
279        })
280    }
281}
282
283/// Get midpoint offset for a row group
284fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
285    let col = meta.column(0);
286    let mut offset = col.data_page_offset();
287    if let Some(dic_offset) = col.dictionary_page_offset() {
288        if offset > dic_offset {
289            offset = dic_offset
290        }
291    };
292    offset + meta.compressed_size() / 2
293}
294
295impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
296    fn metadata(&self) -> &ParquetMetaData {
297        &self.metadata
298    }
299
300    fn num_row_groups(&self) -> usize {
301        self.metadata.num_row_groups()
302    }
303
304    fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
305        let row_group_metadata = self.metadata.row_group(i);
306        // Row groups should be processed sequentially.
307        let props = Arc::clone(&self.props);
308        let f = Arc::clone(&self.chunk_reader);
309        Ok(Box::new(SerializedRowGroupReader::new(
310            f,
311            row_group_metadata,
312            self.metadata.offset_index().map(|x| x[i].as_slice()),
313            props,
314        )?))
315    }
316
317    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
318        RowIter::from_file(projection, self)
319    }
320}
321
322/// A serialized implementation for Parquet [`RowGroupReader`].
323pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
324    chunk_reader: Arc<R>,
325    metadata: &'a RowGroupMetaData,
326    offset_index: Option<&'a [OffsetIndexMetaData]>,
327    props: ReaderPropertiesPtr,
328    bloom_filters: Vec<Option<Sbbf>>,
329}
330
331impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
332    /// Creates new row group reader from a file, row group metadata and custom config.
333    pub fn new(
334        chunk_reader: Arc<R>,
335        metadata: &'a RowGroupMetaData,
336        offset_index: Option<&'a [OffsetIndexMetaData]>,
337        props: ReaderPropertiesPtr,
338    ) -> Result<Self> {
339        let bloom_filters = if props.read_bloom_filter() {
340            metadata
341                .columns()
342                .iter()
343                .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
344                .collect::<Result<Vec<_>>>()?
345        } else {
346            std::iter::repeat_n(None, metadata.columns().len()).collect()
347        };
348        Ok(Self {
349            chunk_reader,
350            metadata,
351            offset_index,
352            props,
353            bloom_filters,
354        })
355    }
356}
357
358impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
359    fn metadata(&self) -> &RowGroupMetaData {
360        self.metadata
361    }
362
363    fn num_columns(&self) -> usize {
364        self.metadata.num_columns()
365    }
366
367    // TODO: fix PARQUET-816
368    fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
369        let col = self.metadata.column(i);
370
371        let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
372
373        let props = Arc::clone(&self.props);
374        Ok(Box::new(SerializedPageReader::new_with_properties(
375            Arc::clone(&self.chunk_reader),
376            col,
377            usize::try_from(self.metadata.num_rows())?,
378            page_locations,
379            props,
380        )?))
381    }
382
383    /// get bloom filter for the `i`th column
384    fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
385        self.bloom_filters[i].as_ref()
386    }
387
388    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
389        RowIter::from_row_group(projection, self)
390    }
391}
392
393/// Decodes a [`Page`] from the provided `buffer`
394pub(crate) fn decode_page(
395    page_header: PageHeader,
396    buffer: Bytes,
397    physical_type: Type,
398    decompressor: Option<&mut Box<dyn Codec>>,
399) -> Result<Page> {
400    // Verify the 32-bit CRC checksum of the page
401    #[cfg(feature = "crc")]
402    if let Some(expected_crc) = page_header.crc {
403        let crc = crc32fast::hash(&buffer);
404        if crc != expected_crc as u32 {
405            return Err(general_err!("Page CRC checksum mismatch"));
406        }
407    }
408
409    // When processing data page v2, depending on enabled compression for the
410    // page, we should account for uncompressed data ('offset') of
411    // repetition and definition levels.
412    //
413    // We always use 0 offset for other pages other than v2, `true` flag means
414    // that compression will be applied if decompressor is defined
415    let mut offset: usize = 0;
416    let mut can_decompress = true;
417
418    if let Some(ref header_v2) = page_header.data_page_header_v2 {
419        if header_v2.definition_levels_byte_length < 0
420            || header_v2.repetition_levels_byte_length < 0
421            || header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
422                > page_header.uncompressed_page_size
423        {
424            return Err(general_err!(
425                "DataPage v2 header contains implausible values \
426                    for definition_levels_byte_length ({}) \
427                    and repetition_levels_byte_length ({}) \
428                    given DataPage header provides uncompressed_page_size ({})",
429                header_v2.definition_levels_byte_length,
430                header_v2.repetition_levels_byte_length,
431                page_header.uncompressed_page_size
432            ));
433        }
434        offset = usize::try_from(
435            header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
436        )?;
437        // When is_compressed flag is missing the page is considered compressed
438        can_decompress = header_v2.is_compressed.unwrap_or(true);
439    }
440
441    let buffer = match decompressor {
442        Some(decompressor) if can_decompress => {
443            let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
444            if offset > buffer.len() || offset > uncompressed_page_size {
445                return Err(general_err!("Invalid page header"));
446            }
447            let decompressed_size = uncompressed_page_size - offset;
448            let mut decompressed = Vec::with_capacity(uncompressed_page_size);
449            decompressed.extend_from_slice(&buffer[..offset]);
450            // decompressed size of zero corresponds to a page with no non-null values
451            // see https://github.com/apache/parquet-format/blob/master/README.md#data-pages
452            if decompressed_size > 0 {
453                let compressed = &buffer[offset..];
454                decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
455            }
456
457            if decompressed.len() != uncompressed_page_size {
458                return Err(general_err!(
459                    "Actual decompressed size doesn't match the expected one ({} vs {})",
460                    decompressed.len(),
461                    uncompressed_page_size
462                ));
463            }
464
465            Bytes::from(decompressed)
466        }
467        _ => buffer,
468    };
469
470    let result = match page_header.r#type {
471        PageType::DICTIONARY_PAGE => {
472            let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
473                ParquetError::General("Missing dictionary page header".to_string())
474            })?;
475            let is_sorted = dict_header.is_sorted.unwrap_or(false);
476            Page::DictionaryPage {
477                buf: buffer,
478                num_values: dict_header.num_values.try_into()?,
479                encoding: dict_header.encoding,
480                is_sorted,
481            }
482        }
483        PageType::DATA_PAGE => {
484            let header = page_header
485                .data_page_header
486                .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
487            Page::DataPage {
488                buf: buffer,
489                num_values: header.num_values.try_into()?,
490                encoding: header.encoding,
491                def_level_encoding: header.definition_level_encoding,
492                rep_level_encoding: header.repetition_level_encoding,
493                statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
494            }
495        }
496        PageType::DATA_PAGE_V2 => {
497            let header = page_header
498                .data_page_header_v2
499                .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
500            let is_compressed = header.is_compressed.unwrap_or(true);
501            Page::DataPageV2 {
502                buf: buffer,
503                num_values: header.num_values.try_into()?,
504                encoding: header.encoding,
505                num_nulls: header.num_nulls.try_into()?,
506                num_rows: header.num_rows.try_into()?,
507                def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
508                rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
509                is_compressed,
510                statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
511            }
512        }
513        _ => {
514            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
515            return Err(general_err!(
516                "Page type {:?} is not supported",
517                page_header.r#type
518            ));
519        }
520    };
521
522    Ok(result)
523}
524
525enum SerializedPageReaderState {
526    Values {
527        /// The current byte offset in the reader
528        /// Note that offset is u64 (i.e., not usize) to support 32-bit architectures such as WASM
529        offset: u64,
530
531        /// The length of the chunk in bytes
532        /// Note that remaining_bytes is u64 (i.e., not usize) to support 32-bit architectures such as WASM
533        remaining_bytes: u64,
534
535        // If the next page header has already been "peeked", we will cache it and it`s length here
536        next_page_header: Option<Box<PageHeader>>,
537
538        /// The index of the data page within this column chunk
539        page_index: usize,
540
541        /// Whether the next page is expected to be a dictionary page
542        require_dictionary: bool,
543    },
544    Pages {
545        /// Remaining page locations
546        page_locations: VecDeque<PageLocation>,
547        /// Remaining dictionary location if any
548        dictionary_page: Option<PageLocation>,
549        /// The total number of rows in this column chunk
550        total_rows: usize,
551        /// The index of the data page within this column chunk
552        page_index: usize,
553    },
554}
555
556#[derive(Default)]
557struct SerializedPageReaderContext {
558    /// Controls decoding of page-level statistics
559    read_stats: bool,
560    /// Crypto context carrying objects required for decryption
561    #[cfg(feature = "encryption")]
562    crypto_context: Option<Arc<CryptoContext>>,
563}
564
565/// A serialized implementation for Parquet [`PageReader`].
566pub struct SerializedPageReader<R: ChunkReader> {
567    /// The chunk reader
568    reader: Arc<R>,
569
570    /// The compression codec for this column chunk. Only set for non-PLAIN codec.
571    decompressor: Option<Box<dyn Codec>>,
572
573    /// Column chunk type.
574    physical_type: Type,
575
576    state: SerializedPageReaderState,
577
578    context: SerializedPageReaderContext,
579}
580
581impl<R: ChunkReader> SerializedPageReader<R> {
582    /// Creates a new serialized page reader from a chunk reader and metadata
583    pub fn new(
584        reader: Arc<R>,
585        column_chunk_metadata: &ColumnChunkMetaData,
586        total_rows: usize,
587        page_locations: Option<Vec<PageLocation>>,
588    ) -> Result<Self> {
589        let props = Arc::new(ReaderProperties::builder().build());
590        SerializedPageReader::new_with_properties(
591            reader,
592            column_chunk_metadata,
593            total_rows,
594            page_locations,
595            props,
596        )
597    }
598
599    /// Stub No-op implementation when encryption is disabled.
600    #[cfg(all(feature = "arrow", not(feature = "encryption")))]
601    pub(crate) fn add_crypto_context(
602        self,
603        _rg_idx: usize,
604        _column_idx: usize,
605        _parquet_meta_data: &ParquetMetaData,
606        _column_chunk_metadata: &ColumnChunkMetaData,
607    ) -> Result<SerializedPageReader<R>> {
608        Ok(self)
609    }
610
611    /// Adds any necessary crypto context to this page reader, if encryption is enabled.
612    #[cfg(feature = "encryption")]
613    pub(crate) fn add_crypto_context(
614        mut self,
615        rg_idx: usize,
616        column_idx: usize,
617        parquet_meta_data: &ParquetMetaData,
618        column_chunk_metadata: &ColumnChunkMetaData,
619    ) -> Result<SerializedPageReader<R>> {
620        let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
621            return Ok(self);
622        };
623        let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
624            return Ok(self);
625        };
626        let crypto_context =
627            CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
628        self.context.crypto_context = Some(Arc::new(crypto_context));
629        Ok(self)
630    }
631
632    /// Creates a new serialized page with custom options.
633    pub fn new_with_properties(
634        reader: Arc<R>,
635        meta: &ColumnChunkMetaData,
636        total_rows: usize,
637        page_locations: Option<Vec<PageLocation>>,
638        props: ReaderPropertiesPtr,
639    ) -> Result<Self> {
640        let decompressor = create_codec(meta.compression(), props.codec_options())?;
641        let (start, len) = meta.byte_range();
642
643        let state = match page_locations {
644            Some(locations) => {
645                // If the offset of the first page doesn't match the start of the column chunk
646                // then the preceding space must contain a dictionary page.
647                let dictionary_page = match locations.first() {
648                    Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
649                        offset: start as i64,
650                        compressed_page_size: (dict_offset.offset as u64 - start) as i32,
651                        first_row_index: 0,
652                    }),
653                    _ => None,
654                };
655
656                SerializedPageReaderState::Pages {
657                    page_locations: locations.into(),
658                    dictionary_page,
659                    total_rows,
660                    page_index: 0,
661                }
662            }
663            None => SerializedPageReaderState::Values {
664                offset: start,
665                remaining_bytes: len,
666                next_page_header: None,
667                page_index: 0,
668                require_dictionary: meta.dictionary_page_offset().is_some(),
669            },
670        };
671        let mut context = SerializedPageReaderContext::default();
672        if props.read_page_stats() {
673            context.read_stats = true;
674        }
675        Ok(Self {
676            reader,
677            decompressor,
678            state,
679            physical_type: meta.column_type(),
680            context,
681        })
682    }
683
684    /// Similar to `peek_next_page`, but returns the offset of the next page instead of the page metadata.
685    /// Unlike page metadata, an offset can uniquely identify a page.
686    ///
687    /// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice.
688    /// This function allows us to check if the next page is being cached or read previously.
689    #[cfg(test)]
690    fn peek_next_page_offset(&mut self) -> Result<Option<u64>> {
691        match &mut self.state {
692            SerializedPageReaderState::Values {
693                offset,
694                remaining_bytes,
695                next_page_header,
696                page_index,
697                require_dictionary,
698            } => {
699                loop {
700                    if *remaining_bytes == 0 {
701                        return Ok(None);
702                    }
703                    return if let Some(header) = next_page_header.as_ref() {
704                        if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
705                            Ok(Some(*offset))
706                        } else {
707                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
708                            *next_page_header = None;
709                            continue;
710                        }
711                    } else {
712                        let mut read = self.reader.get_read(*offset)?;
713                        let (header_len, header) = Self::read_page_header_len(
714                            &self.context,
715                            &mut read,
716                            *page_index,
717                            *require_dictionary,
718                        )?;
719                        *offset += header_len as u64;
720                        *remaining_bytes -= header_len as u64;
721                        let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
722                            Ok(Some(*offset))
723                        } else {
724                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
725                            continue;
726                        };
727                        *next_page_header = Some(Box::new(header));
728                        page_meta
729                    };
730                }
731            }
732            SerializedPageReaderState::Pages {
733                page_locations,
734                dictionary_page,
735                ..
736            } => {
737                if let Some(page) = dictionary_page {
738                    Ok(Some(page.offset as u64))
739                } else if let Some(page) = page_locations.front() {
740                    Ok(Some(page.offset as u64))
741                } else {
742                    Ok(None)
743                }
744            }
745        }
746    }
747
748    fn read_page_header_len<T: Read>(
749        context: &SerializedPageReaderContext,
750        input: &mut T,
751        page_index: usize,
752        dictionary_page: bool,
753    ) -> Result<(usize, PageHeader)> {
754        /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
755        struct TrackedRead<R> {
756            inner: R,
757            bytes_read: usize,
758        }
759
760        impl<R: Read> Read for TrackedRead<R> {
761            fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
762                let v = self.inner.read(buf)?;
763                self.bytes_read += v;
764                Ok(v)
765            }
766        }
767
768        let mut tracked = TrackedRead {
769            inner: input,
770            bytes_read: 0,
771        };
772        let header = context.read_page_header(&mut tracked, page_index, dictionary_page)?;
773        Ok((tracked.bytes_read, header))
774    }
775
776    fn read_page_header_len_from_bytes(
777        context: &SerializedPageReaderContext,
778        buffer: &[u8],
779        page_index: usize,
780        dictionary_page: bool,
781    ) -> Result<(usize, PageHeader)> {
782        let mut input = std::io::Cursor::new(buffer);
783        let header = context.read_page_header(&mut input, page_index, dictionary_page)?;
784        let header_len = input.position() as usize;
785        Ok((header_len, header))
786    }
787}
788
789#[cfg(not(feature = "encryption"))]
790impl SerializedPageReaderContext {
791    fn read_page_header<T: Read>(
792        &self,
793        input: &mut T,
794        _page_index: usize,
795        _dictionary_page: bool,
796    ) -> Result<PageHeader> {
797        let mut prot = ThriftReadInputProtocol::new(input);
798        if self.read_stats {
799            Ok(PageHeader::read_thrift(&mut prot)?)
800        } else {
801            Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
802        }
803    }
804
805    fn decrypt_page_data<T>(
806        &self,
807        buffer: T,
808        _page_index: usize,
809        _dictionary_page: bool,
810    ) -> Result<T> {
811        Ok(buffer)
812    }
813}
814
815#[cfg(feature = "encryption")]
816impl SerializedPageReaderContext {
817    fn read_page_header<T: Read>(
818        &self,
819        input: &mut T,
820        page_index: usize,
821        dictionary_page: bool,
822    ) -> Result<PageHeader> {
823        match self.page_crypto_context(page_index, dictionary_page) {
824            None => {
825                let mut prot = ThriftReadInputProtocol::new(input);
826                if self.read_stats {
827                    Ok(PageHeader::read_thrift(&mut prot)?)
828                } else {
829                    use crate::file::metadata::thrift::PageHeader;
830
831                    Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
832                }
833            }
834            Some(page_crypto_context) => {
835                let data_decryptor = page_crypto_context.data_decryptor();
836                let aad = page_crypto_context.create_page_header_aad()?;
837
838                let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
839                    ParquetError::General(format!(
840                        "Error decrypting page header for column {}, decryption key may be wrong",
841                        page_crypto_context.column_ordinal
842                    ))
843                })?;
844
845                let mut prot = ThriftSliceInputProtocol::new(buf.as_slice());
846                if self.read_stats {
847                    Ok(PageHeader::read_thrift(&mut prot)?)
848                } else {
849                    Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
850                }
851            }
852        }
853    }
854
855    fn decrypt_page_data<T>(&self, buffer: T, page_index: usize, dictionary_page: bool) -> Result<T>
856    where
857        T: AsRef<[u8]>,
858        T: From<Vec<u8>>,
859    {
860        let page_crypto_context = self.page_crypto_context(page_index, dictionary_page);
861        if let Some(page_crypto_context) = page_crypto_context {
862            let decryptor = page_crypto_context.data_decryptor();
863            let aad = page_crypto_context.create_page_aad()?;
864            let decrypted = decryptor.decrypt(buffer.as_ref(), &aad)?;
865            Ok(T::from(decrypted))
866        } else {
867            Ok(buffer)
868        }
869    }
870
871    fn page_crypto_context(
872        &self,
873        page_index: usize,
874        dictionary_page: bool,
875    ) -> Option<Arc<CryptoContext>> {
876        self.crypto_context.as_ref().map(|c| {
877            Arc::new(if dictionary_page {
878                c.for_dictionary_page()
879            } else {
880                c.with_page_ordinal(page_index)
881            })
882        })
883    }
884}
885
886impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
887    type Item = Result<Page>;
888
889    fn next(&mut self) -> Option<Self::Item> {
890        self.get_next_page().transpose()
891    }
892}
893
894fn verify_page_header_len(header_len: usize, remaining_bytes: u64) -> Result<()> {
895    if header_len as u64 > remaining_bytes {
896        return Err(eof_err!("Invalid page header"));
897    }
898    Ok(())
899}
900
901fn verify_page_size(
902    compressed_size: i32,
903    uncompressed_size: i32,
904    remaining_bytes: u64,
905) -> Result<()> {
906    // The page's compressed size should not exceed the remaining bytes that are
907    // available to read. The page's uncompressed size is the expected size
908    // after decompression, which can never be negative.
909    if compressed_size < 0 || compressed_size as u64 > remaining_bytes || uncompressed_size < 0 {
910        return Err(eof_err!("Invalid page header"));
911    }
912    Ok(())
913}
914
915impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
916    fn get_next_page(&mut self) -> Result<Option<Page>> {
917        loop {
918            let page = match &mut self.state {
919                SerializedPageReaderState::Values {
920                    offset,
921                    remaining_bytes: remaining,
922                    next_page_header,
923                    page_index,
924                    require_dictionary,
925                } => {
926                    if *remaining == 0 {
927                        return Ok(None);
928                    }
929
930                    let mut read = self.reader.get_read(*offset)?;
931                    let header = if let Some(header) = next_page_header.take() {
932                        *header
933                    } else {
934                        let (header_len, header) = Self::read_page_header_len(
935                            &self.context,
936                            &mut read,
937                            *page_index,
938                            *require_dictionary,
939                        )?;
940                        verify_page_header_len(header_len, *remaining)?;
941                        *offset += header_len as u64;
942                        *remaining -= header_len as u64;
943                        header
944                    };
945                    verify_page_size(
946                        header.compressed_page_size,
947                        header.uncompressed_page_size,
948                        *remaining,
949                    )?;
950                    let data_len = header.compressed_page_size as usize;
951                    let data_start = *offset;
952                    *offset += data_len as u64;
953                    *remaining -= data_len as u64;
954
955                    if header.r#type == PageType::INDEX_PAGE {
956                        continue;
957                    }
958
959                    let buffer = self.reader.get_bytes(data_start, data_len)?;
960
961                    let buffer =
962                        self.context
963                            .decrypt_page_data(buffer, *page_index, *require_dictionary)?;
964
965                    let page = decode_page(
966                        header,
967                        buffer,
968                        self.physical_type,
969                        self.decompressor.as_mut(),
970                    )?;
971                    if page.is_data_page() {
972                        *page_index += 1;
973                    } else if page.is_dictionary_page() {
974                        *require_dictionary = false;
975                    }
976                    page
977                }
978                SerializedPageReaderState::Pages {
979                    page_locations,
980                    dictionary_page,
981                    page_index,
982                    ..
983                } => {
984                    let (front, is_dictionary_page) = match dictionary_page.take() {
985                        Some(front) => (front, true),
986                        None => match page_locations.pop_front() {
987                            Some(front) => (front, false),
988                            None => return Ok(None),
989                        },
990                    };
991
992                    let page_len = usize::try_from(front.compressed_page_size)?;
993                    let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
994
995                    let (offset, header) = Self::read_page_header_len_from_bytes(
996                        &self.context,
997                        buffer.as_ref(),
998                        *page_index,
999                        is_dictionary_page,
1000                    )?;
1001                    let bytes = buffer.slice(offset..);
1002                    let bytes =
1003                        self.context
1004                            .decrypt_page_data(bytes, *page_index, is_dictionary_page)?;
1005
1006                    if !is_dictionary_page {
1007                        *page_index += 1;
1008                    }
1009                    decode_page(
1010                        header,
1011                        bytes,
1012                        self.physical_type,
1013                        self.decompressor.as_mut(),
1014                    )?
1015                }
1016            };
1017
1018            return Ok(Some(page));
1019        }
1020    }
1021
1022    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
1023        match &mut self.state {
1024            SerializedPageReaderState::Values {
1025                offset,
1026                remaining_bytes,
1027                next_page_header,
1028                page_index,
1029                require_dictionary,
1030            } => {
1031                loop {
1032                    if *remaining_bytes == 0 {
1033                        return Ok(None);
1034                    }
1035                    return if let Some(header) = next_page_header.as_ref() {
1036                        if let Ok(page_meta) = (&**header).try_into() {
1037                            Ok(Some(page_meta))
1038                        } else {
1039                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
1040                            *next_page_header = None;
1041                            continue;
1042                        }
1043                    } else {
1044                        let mut read = self.reader.get_read(*offset)?;
1045                        let (header_len, header) = Self::read_page_header_len(
1046                            &self.context,
1047                            &mut read,
1048                            *page_index,
1049                            *require_dictionary,
1050                        )?;
1051                        verify_page_header_len(header_len, *remaining_bytes)?;
1052                        *offset += header_len as u64;
1053                        *remaining_bytes -= header_len as u64;
1054                        let page_meta = if let Ok(page_meta) = (&header).try_into() {
1055                            Ok(Some(page_meta))
1056                        } else {
1057                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
1058                            continue;
1059                        };
1060                        *next_page_header = Some(Box::new(header));
1061                        page_meta
1062                    };
1063                }
1064            }
1065            SerializedPageReaderState::Pages {
1066                page_locations,
1067                dictionary_page,
1068                total_rows,
1069                page_index: _,
1070            } => {
1071                if dictionary_page.is_some() {
1072                    Ok(Some(PageMetadata {
1073                        num_rows: None,
1074                        num_levels: None,
1075                        is_dict: true,
1076                    }))
1077                } else if let Some(page) = page_locations.front() {
1078                    let next_rows = page_locations
1079                        .get(1)
1080                        .map(|x| x.first_row_index as usize)
1081                        .unwrap_or(*total_rows);
1082
1083                    Ok(Some(PageMetadata {
1084                        num_rows: Some(next_rows - page.first_row_index as usize),
1085                        num_levels: None,
1086                        is_dict: false,
1087                    }))
1088                } else {
1089                    Ok(None)
1090                }
1091            }
1092        }
1093    }
1094
1095    fn skip_next_page(&mut self) -> Result<()> {
1096        match &mut self.state {
1097            SerializedPageReaderState::Values {
1098                offset,
1099                remaining_bytes,
1100                next_page_header,
1101                page_index,
1102                require_dictionary,
1103            } => {
1104                if let Some(buffered_header) = next_page_header.take() {
1105                    verify_page_size(
1106                        buffered_header.compressed_page_size,
1107                        buffered_header.uncompressed_page_size,
1108                        *remaining_bytes,
1109                    )?;
1110                    // The next page header has already been peeked, so just advance the offset
1111                    *offset += buffered_header.compressed_page_size as u64;
1112                    *remaining_bytes -= buffered_header.compressed_page_size as u64;
1113                } else {
1114                    let mut read = self.reader.get_read(*offset)?;
1115                    let (header_len, header) = Self::read_page_header_len(
1116                        &self.context,
1117                        &mut read,
1118                        *page_index,
1119                        *require_dictionary,
1120                    )?;
1121                    verify_page_header_len(header_len, *remaining_bytes)?;
1122                    verify_page_size(
1123                        header.compressed_page_size,
1124                        header.uncompressed_page_size,
1125                        *remaining_bytes,
1126                    )?;
1127                    let data_page_size = header.compressed_page_size as u64;
1128                    *offset += header_len as u64 + data_page_size;
1129                    *remaining_bytes -= header_len as u64 + data_page_size;
1130                }
1131                if *require_dictionary {
1132                    *require_dictionary = false;
1133                } else {
1134                    *page_index += 1;
1135                }
1136                Ok(())
1137            }
1138            SerializedPageReaderState::Pages {
1139                page_locations,
1140                dictionary_page,
1141                page_index,
1142                ..
1143            } => {
1144                if dictionary_page.is_some() {
1145                    // If a dictionary page exists, consume it by taking it (sets to None)
1146                    dictionary_page.take();
1147                } else {
1148                    // If no dictionary page exists, simply pop the data page from page_locations
1149                    if page_locations.pop_front().is_some() {
1150                        *page_index += 1;
1151                    }
1152                }
1153
1154                Ok(())
1155            }
1156        }
1157    }
1158
1159    fn at_record_boundary(&mut self) -> Result<bool> {
1160        match &mut self.state {
1161            SerializedPageReaderState::Values { .. } => match self.peek_next_page()? {
1162                None => Ok(true),
1163                // V2 data pages must start at record boundaries per the parquet
1164                // spec, so the current page ends at one.
1165                Some(metadata) => Ok(metadata.num_rows.is_some()),
1166            },
1167            SerializedPageReaderState::Pages { .. } => Ok(true),
1168        }
1169    }
1170}
1171
1172#[cfg(test)]
1173mod tests {
1174    use std::collections::HashSet;
1175
1176    use bytes::Buf;
1177
1178    use crate::file::page_index::column_index::{
1179        ByteArrayColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex,
1180    };
1181    use crate::file::properties::{EnabledStatistics, WriterProperties};
1182
1183    use crate::basic::{self, BoundaryOrder, ColumnOrder, Encoding, SortOrder};
1184    use crate::column::reader::ColumnReader;
1185    use crate::data_type::private::ParquetValueType;
1186    use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1187    use crate::file::metadata::thrift::DataPageHeaderV2;
1188    #[allow(deprecated)]
1189    use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
1190    use crate::file::writer::SerializedFileWriter;
1191    use crate::record::RowAccessor;
1192    use crate::schema::parser::parse_message_type;
1193    use crate::util::test_common::file_util::{get_test_file, get_test_path};
1194
1195    use super::*;
1196
1197    #[test]
1198    fn test_decode_page_invalid_offset() {
1199        let page_header = PageHeader {
1200            r#type: PageType::DATA_PAGE_V2,
1201            uncompressed_page_size: 10,
1202            compressed_page_size: 10,
1203            data_page_header: None,
1204            index_page_header: None,
1205            dictionary_page_header: None,
1206            crc: None,
1207            data_page_header_v2: Some(DataPageHeaderV2 {
1208                num_nulls: 0,
1209                num_rows: 0,
1210                num_values: 0,
1211                encoding: Encoding::PLAIN,
1212                definition_levels_byte_length: 11,
1213                repetition_levels_byte_length: 0,
1214                is_compressed: None,
1215                statistics: None,
1216            }),
1217        };
1218
1219        let buffer = Bytes::new();
1220        let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1221        assert!(
1222            err.to_string()
1223                .contains("DataPage v2 header contains implausible values")
1224        );
1225    }
1226
1227    #[test]
1228    fn test_decode_unsupported_page() {
1229        let mut page_header = PageHeader {
1230            r#type: PageType::INDEX_PAGE,
1231            uncompressed_page_size: 10,
1232            compressed_page_size: 10,
1233            data_page_header: None,
1234            index_page_header: None,
1235            dictionary_page_header: None,
1236            crc: None,
1237            data_page_header_v2: None,
1238        };
1239        let buffer = Bytes::new();
1240        let err = decode_page(page_header.clone(), buffer.clone(), Type::INT32, None).unwrap_err();
1241        assert_eq!(
1242            err.to_string(),
1243            "Parquet error: Page type INDEX_PAGE is not supported"
1244        );
1245
1246        page_header.data_page_header_v2 = Some(DataPageHeaderV2 {
1247            num_nulls: 0,
1248            num_rows: 0,
1249            num_values: 0,
1250            encoding: Encoding::PLAIN,
1251            definition_levels_byte_length: 11,
1252            repetition_levels_byte_length: 0,
1253            is_compressed: None,
1254            statistics: None,
1255        });
1256        let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1257        assert!(
1258            err.to_string()
1259                .contains("DataPage v2 header contains implausible values")
1260        );
1261    }
1262
1263    #[test]
1264    fn test_cursor_and_file_has_the_same_behaviour() {
1265        let mut buf: Vec<u8> = Vec::new();
1266        get_test_file("alltypes_plain.parquet")
1267            .read_to_end(&mut buf)
1268            .unwrap();
1269        let cursor = Bytes::from(buf);
1270        let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1271
1272        let test_file = get_test_file("alltypes_plain.parquet");
1273        let read_from_file = SerializedFileReader::new(test_file).unwrap();
1274
1275        let file_iter = read_from_file.get_row_iter(None).unwrap();
1276        let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1277
1278        for (a, b) in file_iter.zip(cursor_iter) {
1279            assert_eq!(a.unwrap(), b.unwrap())
1280        }
1281    }
1282
1283    #[test]
1284    fn test_file_reader_try_from() {
1285        // Valid file path
1286        let test_file = get_test_file("alltypes_plain.parquet");
1287        let test_path_buf = get_test_path("alltypes_plain.parquet");
1288        let test_path = test_path_buf.as_path();
1289        let test_path_str = test_path.to_str().unwrap();
1290
1291        let reader = SerializedFileReader::try_from(test_file);
1292        assert!(reader.is_ok());
1293
1294        let reader = SerializedFileReader::try_from(test_path);
1295        assert!(reader.is_ok());
1296
1297        let reader = SerializedFileReader::try_from(test_path_str);
1298        assert!(reader.is_ok());
1299
1300        let reader = SerializedFileReader::try_from(test_path_str.to_string());
1301        assert!(reader.is_ok());
1302
1303        // Invalid file path
1304        let test_path = Path::new("invalid.parquet");
1305        let test_path_str = test_path.to_str().unwrap();
1306
1307        let reader = SerializedFileReader::try_from(test_path);
1308        assert!(reader.is_err());
1309
1310        let reader = SerializedFileReader::try_from(test_path_str);
1311        assert!(reader.is_err());
1312
1313        let reader = SerializedFileReader::try_from(test_path_str.to_string());
1314        assert!(reader.is_err());
1315    }
1316
1317    #[test]
1318    fn test_file_reader_into_iter() {
1319        let path = get_test_path("alltypes_plain.parquet");
1320        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1321        let iter = reader.into_iter();
1322        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1323
1324        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1325    }
1326
1327    #[test]
1328    fn test_file_reader_into_iter_project() {
1329        let path = get_test_path("alltypes_plain.parquet");
1330        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1331        let schema = "message schema { OPTIONAL INT32 id; }";
1332        let proj = parse_message_type(schema).ok();
1333        let iter = reader.into_iter().project(proj).unwrap();
1334        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1335
1336        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1337    }
1338
1339    #[test]
1340    fn test_reuse_file_chunk() {
1341        // This test covers the case of maintaining the correct start position in a file
1342        // stream for each column reader after initializing and moving to the next one
1343        // (without necessarily reading the entire column).
1344        let test_file = get_test_file("alltypes_plain.parquet");
1345        let reader = SerializedFileReader::new(test_file).unwrap();
1346        let row_group = reader.get_row_group(0).unwrap();
1347
1348        let mut page_readers = Vec::new();
1349        for i in 0..row_group.num_columns() {
1350            page_readers.push(row_group.get_column_page_reader(i).unwrap());
1351        }
1352
1353        // Now buffer each col reader, we do not expect any failures like:
1354        // General("underlying Thrift error: end of file")
1355        for mut page_reader in page_readers {
1356            assert!(page_reader.get_next_page().is_ok());
1357        }
1358    }
1359
1360    #[test]
1361    fn test_file_reader() {
1362        let test_file = get_test_file("alltypes_plain.parquet");
1363        let reader_result = SerializedFileReader::new(test_file);
1364        assert!(reader_result.is_ok());
1365        let reader = reader_result.unwrap();
1366
1367        // Test contents in Parquet metadata
1368        let metadata = reader.metadata();
1369        assert_eq!(metadata.num_row_groups(), 1);
1370
1371        // Test contents in file metadata
1372        let file_metadata = metadata.file_metadata();
1373        assert!(file_metadata.created_by().is_some());
1374        assert_eq!(
1375            file_metadata.created_by().unwrap(),
1376            "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1377        );
1378        assert!(file_metadata.key_value_metadata().is_none());
1379        assert_eq!(file_metadata.num_rows(), 8);
1380        assert_eq!(file_metadata.version(), 1);
1381        assert_eq!(file_metadata.column_orders(), None);
1382
1383        // Test contents in row group metadata
1384        let row_group_metadata = metadata.row_group(0);
1385        assert_eq!(row_group_metadata.num_columns(), 11);
1386        assert_eq!(row_group_metadata.num_rows(), 8);
1387        assert_eq!(row_group_metadata.total_byte_size(), 671);
1388        // Check each column order
1389        for i in 0..row_group_metadata.num_columns() {
1390            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1391        }
1392
1393        // Test row group reader
1394        let row_group_reader_result = reader.get_row_group(0);
1395        assert!(row_group_reader_result.is_ok());
1396        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1397        assert_eq!(
1398            row_group_reader.num_columns(),
1399            row_group_metadata.num_columns()
1400        );
1401        assert_eq!(
1402            row_group_reader.metadata().total_byte_size(),
1403            row_group_metadata.total_byte_size()
1404        );
1405
1406        // Test page readers
1407        // TODO: test for every column
1408        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1409        assert!(page_reader_0_result.is_ok());
1410        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1411        let mut page_count = 0;
1412        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1413            let is_expected_page = match page {
1414                Page::DictionaryPage {
1415                    buf,
1416                    num_values,
1417                    encoding,
1418                    is_sorted,
1419                } => {
1420                    assert_eq!(buf.len(), 32);
1421                    assert_eq!(num_values, 8);
1422                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1423                    assert!(!is_sorted);
1424                    true
1425                }
1426                Page::DataPage {
1427                    buf,
1428                    num_values,
1429                    encoding,
1430                    def_level_encoding,
1431                    rep_level_encoding,
1432                    statistics,
1433                } => {
1434                    assert_eq!(buf.len(), 11);
1435                    assert_eq!(num_values, 8);
1436                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1437                    assert_eq!(def_level_encoding, Encoding::RLE);
1438                    #[allow(deprecated)]
1439                    let expected_rep_level_encoding = Encoding::BIT_PACKED;
1440                    assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1441                    assert!(statistics.is_none());
1442                    true
1443                }
1444                _ => false,
1445            };
1446            assert!(is_expected_page);
1447            page_count += 1;
1448        }
1449        assert_eq!(page_count, 2);
1450    }
1451
1452    #[test]
1453    fn test_file_reader_datapage_v2() {
1454        let test_file = get_test_file("datapage_v2.snappy.parquet");
1455        let reader_result = SerializedFileReader::new(test_file);
1456        assert!(reader_result.is_ok());
1457        let reader = reader_result.unwrap();
1458
1459        // Test contents in Parquet metadata
1460        let metadata = reader.metadata();
1461        assert_eq!(metadata.num_row_groups(), 1);
1462
1463        // Test contents in file metadata
1464        let file_metadata = metadata.file_metadata();
1465        assert!(file_metadata.created_by().is_some());
1466        assert_eq!(
1467            file_metadata.created_by().unwrap(),
1468            "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1469        );
1470        assert!(file_metadata.key_value_metadata().is_some());
1471        assert_eq!(
1472            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1473            1
1474        );
1475
1476        assert_eq!(file_metadata.num_rows(), 5);
1477        assert_eq!(file_metadata.version(), 1);
1478        assert_eq!(file_metadata.column_orders(), None);
1479
1480        let row_group_metadata = metadata.row_group(0);
1481
1482        // Check each column order
1483        for i in 0..row_group_metadata.num_columns() {
1484            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1485        }
1486
1487        // Test row group reader
1488        let row_group_reader_result = reader.get_row_group(0);
1489        assert!(row_group_reader_result.is_ok());
1490        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1491        assert_eq!(
1492            row_group_reader.num_columns(),
1493            row_group_metadata.num_columns()
1494        );
1495        assert_eq!(
1496            row_group_reader.metadata().total_byte_size(),
1497            row_group_metadata.total_byte_size()
1498        );
1499
1500        // Test page readers
1501        // TODO: test for every column
1502        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1503        assert!(page_reader_0_result.is_ok());
1504        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1505        let mut page_count = 0;
1506        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1507            let is_expected_page = match page {
1508                Page::DictionaryPage {
1509                    buf,
1510                    num_values,
1511                    encoding,
1512                    is_sorted,
1513                } => {
1514                    assert_eq!(buf.len(), 7);
1515                    assert_eq!(num_values, 1);
1516                    assert_eq!(encoding, Encoding::PLAIN);
1517                    assert!(!is_sorted);
1518                    true
1519                }
1520                Page::DataPageV2 {
1521                    buf,
1522                    num_values,
1523                    encoding,
1524                    num_nulls,
1525                    num_rows,
1526                    def_levels_byte_len,
1527                    rep_levels_byte_len,
1528                    is_compressed,
1529                    statistics,
1530                } => {
1531                    assert_eq!(buf.len(), 4);
1532                    assert_eq!(num_values, 5);
1533                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1534                    assert_eq!(num_nulls, 1);
1535                    assert_eq!(num_rows, 5);
1536                    assert_eq!(def_levels_byte_len, 2);
1537                    assert_eq!(rep_levels_byte_len, 0);
1538                    assert!(is_compressed);
1539                    assert!(statistics.is_none()); // page stats are no longer read
1540                    true
1541                }
1542                _ => false,
1543            };
1544            assert!(is_expected_page);
1545            page_count += 1;
1546        }
1547        assert_eq!(page_count, 2);
1548    }
1549
1550    #[test]
1551    fn test_file_reader_empty_compressed_datapage_v2() {
1552        // this file has a compressed datapage that un-compresses to 0 bytes
1553        let test_file = get_test_file("page_v2_empty_compressed.parquet");
1554        let reader_result = SerializedFileReader::new(test_file);
1555        assert!(reader_result.is_ok());
1556        let reader = reader_result.unwrap();
1557
1558        // Test contents in Parquet metadata
1559        let metadata = reader.metadata();
1560        assert_eq!(metadata.num_row_groups(), 1);
1561
1562        // Test contents in file metadata
1563        let file_metadata = metadata.file_metadata();
1564        assert!(file_metadata.created_by().is_some());
1565        assert_eq!(
1566            file_metadata.created_by().unwrap(),
1567            "parquet-cpp-arrow version 14.0.2"
1568        );
1569        assert!(file_metadata.key_value_metadata().is_some());
1570        assert_eq!(
1571            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1572            1
1573        );
1574
1575        assert_eq!(file_metadata.num_rows(), 10);
1576        assert_eq!(file_metadata.version(), 2);
1577        let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1578        assert_eq!(
1579            file_metadata.column_orders(),
1580            Some(vec![expected_order].as_ref())
1581        );
1582
1583        let row_group_metadata = metadata.row_group(0);
1584
1585        // Check each column order
1586        for i in 0..row_group_metadata.num_columns() {
1587            assert_eq!(file_metadata.column_order(i), expected_order);
1588        }
1589
1590        // Test row group reader
1591        let row_group_reader_result = reader.get_row_group(0);
1592        assert!(row_group_reader_result.is_ok());
1593        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1594        assert_eq!(
1595            row_group_reader.num_columns(),
1596            row_group_metadata.num_columns()
1597        );
1598        assert_eq!(
1599            row_group_reader.metadata().total_byte_size(),
1600            row_group_metadata.total_byte_size()
1601        );
1602
1603        // Test page readers
1604        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1605        assert!(page_reader_0_result.is_ok());
1606        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1607        let mut page_count = 0;
1608        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1609            let is_expected_page = match page {
1610                Page::DictionaryPage {
1611                    buf,
1612                    num_values,
1613                    encoding,
1614                    is_sorted,
1615                } => {
1616                    assert_eq!(buf.len(), 0);
1617                    assert_eq!(num_values, 0);
1618                    assert_eq!(encoding, Encoding::PLAIN);
1619                    assert!(!is_sorted);
1620                    true
1621                }
1622                Page::DataPageV2 {
1623                    buf,
1624                    num_values,
1625                    encoding,
1626                    num_nulls,
1627                    num_rows,
1628                    def_levels_byte_len,
1629                    rep_levels_byte_len,
1630                    is_compressed,
1631                    statistics,
1632                } => {
1633                    assert_eq!(buf.len(), 3);
1634                    assert_eq!(num_values, 10);
1635                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1636                    assert_eq!(num_nulls, 10);
1637                    assert_eq!(num_rows, 10);
1638                    assert_eq!(def_levels_byte_len, 2);
1639                    assert_eq!(rep_levels_byte_len, 0);
1640                    assert!(is_compressed);
1641                    assert!(statistics.is_none()); // page stats are no longer read
1642                    true
1643                }
1644                _ => false,
1645            };
1646            assert!(is_expected_page);
1647            page_count += 1;
1648        }
1649        assert_eq!(page_count, 2);
1650    }
1651
1652    #[test]
1653    fn test_file_reader_empty_datapage_v2() {
1654        // this file has 0 bytes compressed datapage that un-compresses to 0 bytes
1655        let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1656        let reader_result = SerializedFileReader::new(test_file);
1657        assert!(reader_result.is_ok());
1658        let reader = reader_result.unwrap();
1659
1660        // Test contents in Parquet metadata
1661        let metadata = reader.metadata();
1662        assert_eq!(metadata.num_row_groups(), 1);
1663
1664        // Test contents in file metadata
1665        let file_metadata = metadata.file_metadata();
1666        assert!(file_metadata.created_by().is_some());
1667        assert_eq!(
1668            file_metadata.created_by().unwrap(),
1669            "parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1670        );
1671        assert!(file_metadata.key_value_metadata().is_some());
1672        assert_eq!(
1673            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1674            2
1675        );
1676
1677        assert_eq!(file_metadata.num_rows(), 1);
1678        assert_eq!(file_metadata.version(), 1);
1679        let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1680        assert_eq!(
1681            file_metadata.column_orders(),
1682            Some(vec![expected_order].as_ref())
1683        );
1684
1685        let row_group_metadata = metadata.row_group(0);
1686
1687        // Check each column order
1688        for i in 0..row_group_metadata.num_columns() {
1689            assert_eq!(file_metadata.column_order(i), expected_order);
1690        }
1691
1692        // Test row group reader
1693        let row_group_reader_result = reader.get_row_group(0);
1694        assert!(row_group_reader_result.is_ok());
1695        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1696        assert_eq!(
1697            row_group_reader.num_columns(),
1698            row_group_metadata.num_columns()
1699        );
1700        assert_eq!(
1701            row_group_reader.metadata().total_byte_size(),
1702            row_group_metadata.total_byte_size()
1703        );
1704
1705        // Test page readers
1706        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1707        assert!(page_reader_0_result.is_ok());
1708        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1709        let mut page_count = 0;
1710        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1711            let is_expected_page = match page {
1712                Page::DataPageV2 {
1713                    buf,
1714                    num_values,
1715                    encoding,
1716                    num_nulls,
1717                    num_rows,
1718                    def_levels_byte_len,
1719                    rep_levels_byte_len,
1720                    is_compressed,
1721                    statistics,
1722                } => {
1723                    assert_eq!(buf.len(), 2);
1724                    assert_eq!(num_values, 1);
1725                    assert_eq!(encoding, Encoding::PLAIN);
1726                    assert_eq!(num_nulls, 1);
1727                    assert_eq!(num_rows, 1);
1728                    assert_eq!(def_levels_byte_len, 2);
1729                    assert_eq!(rep_levels_byte_len, 0);
1730                    assert!(is_compressed);
1731                    assert!(statistics.is_none());
1732                    true
1733                }
1734                _ => false,
1735            };
1736            assert!(is_expected_page);
1737            page_count += 1;
1738        }
1739        assert_eq!(page_count, 1);
1740    }
1741
1742    fn get_serialized_page_reader<R: ChunkReader>(
1743        file_reader: &SerializedFileReader<R>,
1744        row_group: usize,
1745        column: usize,
1746    ) -> Result<SerializedPageReader<R>> {
1747        let row_group = {
1748            let row_group_metadata = file_reader.metadata.row_group(row_group);
1749            let props = Arc::clone(&file_reader.props);
1750            let f = Arc::clone(&file_reader.chunk_reader);
1751            SerializedRowGroupReader::new(
1752                f,
1753                row_group_metadata,
1754                file_reader
1755                    .metadata
1756                    .offset_index()
1757                    .map(|x| x[row_group].as_slice()),
1758                props,
1759            )?
1760        };
1761
1762        let col = row_group.metadata.column(column);
1763
1764        let page_locations = row_group
1765            .offset_index
1766            .map(|x| x[column].page_locations.clone());
1767
1768        let props = Arc::clone(&row_group.props);
1769        SerializedPageReader::new_with_properties(
1770            Arc::clone(&row_group.chunk_reader),
1771            col,
1772            usize::try_from(row_group.metadata.num_rows())?,
1773            page_locations,
1774            props,
1775        )
1776    }
1777
1778    #[test]
1779    fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1780        let test_file = get_test_file("alltypes_plain.parquet");
1781        let reader = SerializedFileReader::new(test_file)?;
1782
1783        let mut offset_set = HashSet::new();
1784        let num_row_groups = reader.metadata.num_row_groups();
1785        for row_group in 0..num_row_groups {
1786            let num_columns = reader.metadata.row_group(row_group).num_columns();
1787            for column in 0..num_columns {
1788                let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1789
1790                while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1791                    match &page_reader.state {
1792                        SerializedPageReaderState::Pages {
1793                            page_locations,
1794                            dictionary_page,
1795                            ..
1796                        } => {
1797                            if let Some(page) = dictionary_page {
1798                                assert_eq!(page.offset as u64, page_offset);
1799                            } else if let Some(page) = page_locations.front() {
1800                                assert_eq!(page.offset as u64, page_offset);
1801                            } else {
1802                                unreachable!()
1803                            }
1804                        }
1805                        SerializedPageReaderState::Values {
1806                            offset,
1807                            next_page_header,
1808                            ..
1809                        } => {
1810                            assert!(next_page_header.is_some());
1811                            assert_eq!(*offset, page_offset);
1812                        }
1813                    }
1814                    let page = page_reader.get_next_page()?;
1815                    assert!(page.is_some());
1816                    let newly_inserted = offset_set.insert(page_offset);
1817                    assert!(newly_inserted);
1818                }
1819            }
1820        }
1821
1822        Ok(())
1823    }
1824
1825    #[test]
1826    fn test_page_iterator() {
1827        let file = get_test_file("alltypes_plain.parquet");
1828        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1829
1830        let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1831
1832        // read first page
1833        let page = page_iterator.next();
1834        assert!(page.is_some());
1835        assert!(page.unwrap().is_ok());
1836
1837        // reach end of file
1838        let page = page_iterator.next();
1839        assert!(page.is_none());
1840
1841        let row_group_indices = Box::new(0..1);
1842        let mut page_iterator =
1843            FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1844
1845        // read first page
1846        let page = page_iterator.next();
1847        assert!(page.is_some());
1848        assert!(page.unwrap().is_ok());
1849
1850        // reach end of file
1851        let page = page_iterator.next();
1852        assert!(page.is_none());
1853    }
1854
1855    #[test]
1856    fn test_file_reader_key_value_metadata() {
1857        let file = get_test_file("binary.parquet");
1858        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1859
1860        let metadata = file_reader
1861            .metadata
1862            .file_metadata()
1863            .key_value_metadata()
1864            .unwrap();
1865
1866        assert_eq!(metadata.len(), 3);
1867
1868        assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1869
1870        assert_eq!(metadata[1].key, "writer.model.name");
1871        assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1872
1873        assert_eq!(metadata[2].key, "parquet.proto.class");
1874        assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1875    }
1876
1877    #[test]
1878    fn test_file_reader_optional_metadata() {
1879        // file with optional metadata: bloom filters, encoding stats, column index and offset index.
1880        let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1881        let options = ReadOptionsBuilder::new()
1882            .with_encoding_stats_as_mask(false)
1883            .build();
1884        let file_reader = Arc::new(SerializedFileReader::new_with_options(file, options).unwrap());
1885
1886        let row_group_metadata = file_reader.metadata.row_group(0);
1887        let col0_metadata = row_group_metadata.column(0);
1888
1889        // test optional bloom filter offset
1890        assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1891
1892        // test page encoding stats
1893        let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1894
1895        assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1896        assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1897        assert_eq!(page_encoding_stats.count, 1);
1898
1899        // test optional column index offset
1900        assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1901        assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1902
1903        // test optional offset index offset
1904        assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1905        assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1906    }
1907
1908    #[test]
1909    fn test_file_reader_page_stats_mask() {
1910        let file = get_test_file("alltypes_tiny_pages.parquet");
1911        let options = ReadOptionsBuilder::new()
1912            .with_encoding_stats_as_mask(true)
1913            .build();
1914        let file_reader = Arc::new(SerializedFileReader::new_with_options(file, options).unwrap());
1915
1916        let row_group_metadata = file_reader.metadata.row_group(0);
1917
1918        // test page encoding stats
1919        let page_encoding_stats = row_group_metadata
1920            .column(0)
1921            .page_encoding_stats_mask()
1922            .unwrap();
1923        assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1924        let page_encoding_stats = row_group_metadata
1925            .column(2)
1926            .page_encoding_stats_mask()
1927            .unwrap();
1928        assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1929    }
1930
1931    #[test]
1932    fn test_file_reader_page_stats_skipped() {
1933        let file = get_test_file("alltypes_tiny_pages.parquet");
1934
1935        // test skipping all
1936        let options = ReadOptionsBuilder::new()
1937            .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1938            .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll)
1939            .build();
1940        let file_reader = Arc::new(
1941            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1942        );
1943
1944        let row_group_metadata = file_reader.metadata.row_group(0);
1945        for column in row_group_metadata.columns() {
1946            assert!(column.page_encoding_stats().is_none());
1947            assert!(column.page_encoding_stats_mask().is_none());
1948            assert!(column.statistics().is_none());
1949        }
1950
1951        // test skipping all but one column
1952        let options = ReadOptionsBuilder::new()
1953            .with_encoding_stats_as_mask(true)
1954            .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1955            .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1956            .build();
1957        let file_reader = Arc::new(
1958            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1959        );
1960
1961        let row_group_metadata = file_reader.metadata.row_group(0);
1962        for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1963            assert!(column.page_encoding_stats().is_none());
1964            assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1965            assert_eq!(column.statistics().is_some(), idx == 0);
1966        }
1967    }
1968
1969    #[test]
1970    fn test_file_reader_size_stats_skipped() {
1971        let file = get_test_file("repeated_primitive_no_list.parquet");
1972
1973        // test skipping all
1974        let options = ReadOptionsBuilder::new()
1975            .with_size_stats_policy(ParquetStatisticsPolicy::SkipAll)
1976            .build();
1977        let file_reader = Arc::new(
1978            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1979        );
1980
1981        let row_group_metadata = file_reader.metadata.row_group(0);
1982        for column in row_group_metadata.columns() {
1983            assert!(column.repetition_level_histogram().is_none());
1984            assert!(column.definition_level_histogram().is_none());
1985            assert!(column.unencoded_byte_array_data_bytes().is_none());
1986        }
1987
1988        // test skipping all but one column
1989        let options = ReadOptionsBuilder::new()
1990            .with_encoding_stats_as_mask(true)
1991            .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]))
1992            .build();
1993        let file_reader = Arc::new(
1994            SerializedFileReader::new_with_options(file.try_clone().unwrap(), options).unwrap(),
1995        );
1996
1997        let row_group_metadata = file_reader.metadata.row_group(0);
1998        for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1999            assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
2000            assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
2001            assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
2002        }
2003    }
2004
2005    #[test]
2006    fn test_file_reader_with_no_filter() -> Result<()> {
2007        let test_file = get_test_file("alltypes_plain.parquet");
2008        let origin_reader = SerializedFileReader::new(test_file)?;
2009        // test initial number of row groups
2010        let metadata = origin_reader.metadata();
2011        assert_eq!(metadata.num_row_groups(), 1);
2012        Ok(())
2013    }
2014
2015    #[test]
2016    fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
2017        let test_file = get_test_file("alltypes_plain.parquet");
2018        let read_options = ReadOptionsBuilder::new()
2019            .with_predicate(Box::new(|_, _| false))
2020            .build();
2021        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2022        let metadata = reader.metadata();
2023        assert_eq!(metadata.num_row_groups(), 0);
2024        Ok(())
2025    }
2026
2027    #[test]
2028    fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
2029        let test_file = get_test_file("alltypes_plain.parquet");
2030        let origin_reader = SerializedFileReader::new(test_file)?;
2031        // test initial number of row groups
2032        let metadata = origin_reader.metadata();
2033        assert_eq!(metadata.num_row_groups(), 1);
2034        let mid = get_midpoint_offset(metadata.row_group(0));
2035
2036        let test_file = get_test_file("alltypes_plain.parquet");
2037        let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
2038        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2039        let metadata = reader.metadata();
2040        assert_eq!(metadata.num_row_groups(), 1);
2041
2042        let test_file = get_test_file("alltypes_plain.parquet");
2043        let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
2044        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2045        let metadata = reader.metadata();
2046        assert_eq!(metadata.num_row_groups(), 0);
2047        Ok(())
2048    }
2049
2050    #[test]
2051    fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
2052        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2053        let origin_reader = SerializedFileReader::new(test_file)?;
2054        let metadata = origin_reader.metadata();
2055        let mid = get_midpoint_offset(metadata.row_group(0));
2056
2057        // true, true predicate
2058        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2059        let read_options = ReadOptionsBuilder::new()
2060            .with_page_index()
2061            .with_predicate(Box::new(|_, _| true))
2062            .with_range(mid, mid + 1)
2063            .build();
2064        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2065        let metadata = reader.metadata();
2066        assert_eq!(metadata.num_row_groups(), 1);
2067        assert_eq!(metadata.column_index().unwrap().len(), 1);
2068        assert_eq!(metadata.offset_index().unwrap().len(), 1);
2069
2070        // true, false predicate
2071        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2072        let read_options = ReadOptionsBuilder::new()
2073            .with_page_index()
2074            .with_predicate(Box::new(|_, _| true))
2075            .with_range(0, mid)
2076            .build();
2077        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2078        let metadata = reader.metadata();
2079        assert_eq!(metadata.num_row_groups(), 0);
2080        assert!(metadata.column_index().is_none());
2081        assert!(metadata.offset_index().is_none());
2082
2083        // false, true predicate
2084        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2085        let read_options = ReadOptionsBuilder::new()
2086            .with_page_index()
2087            .with_predicate(Box::new(|_, _| false))
2088            .with_range(mid, mid + 1)
2089            .build();
2090        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2091        let metadata = reader.metadata();
2092        assert_eq!(metadata.num_row_groups(), 0);
2093        assert!(metadata.column_index().is_none());
2094        assert!(metadata.offset_index().is_none());
2095
2096        // false, false predicate
2097        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2098        let read_options = ReadOptionsBuilder::new()
2099            .with_page_index()
2100            .with_predicate(Box::new(|_, _| false))
2101            .with_range(0, mid)
2102            .build();
2103        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
2104        let metadata = reader.metadata();
2105        assert_eq!(metadata.num_row_groups(), 0);
2106        assert!(metadata.column_index().is_none());
2107        assert!(metadata.offset_index().is_none());
2108        Ok(())
2109    }
2110
2111    #[test]
2112    fn test_file_reader_invalid_metadata() {
2113        let data = [
2114            255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
2115            80, 65, 82, 49,
2116        ];
2117        let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
2118        assert_eq!(
2119            ret.err().unwrap().to_string(),
2120            "Parquet error: Received empty union from remote ColumnOrder"
2121        );
2122    }
2123
2124    #[test]
2125    // Use java parquet-tools get below pageIndex info
2126    // !```
2127    // parquet-tools column-index ./data_index_bloom_encoding_stats.parquet
2128    // row group 0:
2129    // column index for column String:
2130    // Boundary order: ASCENDING
2131    // page-0  :
2132    // null count                 min                                  max
2133    // 0                          Hello                                today
2134    //
2135    // offset index for column String:
2136    // page-0   :
2137    // offset   compressed size       first row index
2138    // 4               152                     0
2139    ///```
2140    //
2141    fn test_page_index_reader() {
2142        let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
2143        let builder = ReadOptionsBuilder::new();
2144        //enable read page index
2145        let options = builder.with_page_index().build();
2146        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2147        let reader = reader_result.unwrap();
2148
2149        // Test contents in Parquet metadata
2150        let metadata = reader.metadata();
2151        assert_eq!(metadata.num_row_groups(), 1);
2152
2153        let column_index = metadata.column_index().unwrap();
2154
2155        // only one row group
2156        assert_eq!(column_index.len(), 1);
2157        let index = if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][0] {
2158            index
2159        } else {
2160            unreachable!()
2161        };
2162
2163        assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
2164
2165        //only one page group
2166        assert_eq!(index.num_pages(), 1);
2167
2168        let min = index.min_value(0).unwrap();
2169        let max = index.max_value(0).unwrap();
2170        assert_eq!(b"Hello", min.as_bytes());
2171        assert_eq!(b"today", max.as_bytes());
2172
2173        let offset_indexes = metadata.offset_index().unwrap();
2174        // only one row group
2175        assert_eq!(offset_indexes.len(), 1);
2176        let offset_index = &offset_indexes[0];
2177        let page_offset = &offset_index[0].page_locations()[0];
2178
2179        assert_eq!(4, page_offset.offset);
2180        assert_eq!(152, page_offset.compressed_page_size);
2181        assert_eq!(0, page_offset.first_row_index);
2182    }
2183
2184    #[test]
2185    #[allow(deprecated)]
2186    fn test_page_index_reader_out_of_order() {
2187        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2188        let options = ReadOptionsBuilder::new().with_page_index().build();
2189        let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
2190        let metadata = reader.metadata();
2191
2192        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2193        let columns = metadata.row_group(0).columns();
2194        let reversed: Vec<_> = columns.iter().cloned().rev().collect();
2195
2196        let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
2197        let mut b = read_columns_indexes(&test_file, &reversed)
2198            .unwrap()
2199            .unwrap();
2200        b.reverse();
2201        assert_eq!(a, b);
2202
2203        let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
2204        let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
2205        b.reverse();
2206        assert_eq!(a, b);
2207    }
2208
2209    #[test]
2210    fn test_page_index_reader_all_type() {
2211        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2212        let builder = ReadOptionsBuilder::new();
2213        //enable read page index
2214        let options = builder.with_page_index().build();
2215        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2216        let reader = reader_result.unwrap();
2217
2218        // Test contents in Parquet metadata
2219        let metadata = reader.metadata();
2220        assert_eq!(metadata.num_row_groups(), 1);
2221
2222        let column_index = metadata.column_index().unwrap();
2223        let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
2224
2225        // only one row group
2226        assert_eq!(column_index.len(), 1);
2227        let row_group_metadata = metadata.row_group(0);
2228
2229        //col0->id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 7299, num_nulls: 0]
2230        assert!(!&column_index[0][0].is_sorted());
2231        let boundary_order = &column_index[0][0].get_boundary_order();
2232        assert!(boundary_order.is_some());
2233        matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
2234        if let ColumnIndexMetaData::INT32(index) = &column_index[0][0] {
2235            check_native_page_index(
2236                index,
2237                325,
2238                get_row_group_min_max_bytes(row_group_metadata, 0),
2239                BoundaryOrder::UNORDERED,
2240            );
2241            assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
2242        } else {
2243            unreachable!()
2244        };
2245        //col1->bool_col:BOOLEAN UNCOMPRESSED DO:0 FPO:37329 SZ:3022/3022/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: false, max: true, num_nulls: 0]
2246        assert!(&column_index[0][1].is_sorted());
2247        if let ColumnIndexMetaData::BOOLEAN(index) = &column_index[0][1] {
2248            assert_eq!(index.num_pages(), 82);
2249            assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
2250        } else {
2251            unreachable!()
2252        };
2253        //col2->tinyint_col: INT32 UNCOMPRESSED DO:0 FPO:40351 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2254        assert!(&column_index[0][2].is_sorted());
2255        if let ColumnIndexMetaData::INT32(index) = &column_index[0][2] {
2256            check_native_page_index(
2257                index,
2258                325,
2259                get_row_group_min_max_bytes(row_group_metadata, 2),
2260                BoundaryOrder::ASCENDING,
2261            );
2262            assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
2263        } else {
2264            unreachable!()
2265        };
2266        //col4->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2267        assert!(&column_index[0][3].is_sorted());
2268        if let ColumnIndexMetaData::INT32(index) = &column_index[0][3] {
2269            check_native_page_index(
2270                index,
2271                325,
2272                get_row_group_min_max_bytes(row_group_metadata, 3),
2273                BoundaryOrder::ASCENDING,
2274            );
2275            assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
2276        } else {
2277            unreachable!()
2278        };
2279        //col5->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2280        assert!(&column_index[0][4].is_sorted());
2281        if let ColumnIndexMetaData::INT32(index) = &column_index[0][4] {
2282            check_native_page_index(
2283                index,
2284                325,
2285                get_row_group_min_max_bytes(row_group_metadata, 4),
2286                BoundaryOrder::ASCENDING,
2287            );
2288            assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
2289        } else {
2290            unreachable!()
2291        };
2292        //col6->bigint_col: INT64 UNCOMPRESSED DO:0 FPO:152326 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 90, num_nulls: 0]
2293        assert!(!&column_index[0][5].is_sorted());
2294        if let ColumnIndexMetaData::INT64(index) = &column_index[0][5] {
2295            check_native_page_index(
2296                index,
2297                528,
2298                get_row_group_min_max_bytes(row_group_metadata, 5),
2299                BoundaryOrder::UNORDERED,
2300            );
2301            assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
2302        } else {
2303            unreachable!()
2304        };
2305        //col7->float_col: FLOAT UNCOMPRESSED DO:0 FPO:223924 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 9.9, num_nulls: 0]
2306        assert!(&column_index[0][6].is_sorted());
2307        if let ColumnIndexMetaData::FLOAT(index) = &column_index[0][6] {
2308            check_native_page_index(
2309                index,
2310                325,
2311                get_row_group_min_max_bytes(row_group_metadata, 6),
2312                BoundaryOrder::ASCENDING,
2313            );
2314            assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
2315        } else {
2316            unreachable!()
2317        };
2318        //col8->double_col: DOUBLE UNCOMPRESSED DO:0 FPO:261249 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 90.89999999999999, num_nulls: 0]
2319        assert!(!&column_index[0][7].is_sorted());
2320        if let ColumnIndexMetaData::DOUBLE(index) = &column_index[0][7] {
2321            check_native_page_index(
2322                index,
2323                528,
2324                get_row_group_min_max_bytes(row_group_metadata, 7),
2325                BoundaryOrder::UNORDERED,
2326            );
2327            assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
2328        } else {
2329            unreachable!()
2330        };
2331        //col9->date_string_col: BINARY UNCOMPRESSED DO:0 FPO:332847 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 01/01/09, max: 12/31/10, num_nulls: 0]
2332        assert!(!&column_index[0][8].is_sorted());
2333        if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][8] {
2334            check_byte_array_page_index(
2335                index,
2336                974,
2337                get_row_group_min_max_bytes(row_group_metadata, 8),
2338                BoundaryOrder::UNORDERED,
2339            );
2340            assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
2341        } else {
2342            unreachable!()
2343        };
2344        //col10->string_col: BINARY UNCOMPRESSED DO:0 FPO:444795 SZ:45298/45298/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2345        assert!(&column_index[0][9].is_sorted());
2346        if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][9] {
2347            check_byte_array_page_index(
2348                index,
2349                352,
2350                get_row_group_min_max_bytes(row_group_metadata, 9),
2351                BoundaryOrder::ASCENDING,
2352            );
2353            assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
2354        } else {
2355            unreachable!()
2356        };
2357        //col11->timestamp_col: INT96 UNCOMPRESSED DO:0 FPO:490093 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[num_nulls: 0, min/max not defined]
2358        //Notice: min_max values for each page for this col not exits.
2359        assert!(!&column_index[0][10].is_sorted());
2360        if let ColumnIndexMetaData::NONE = &column_index[0][10] {
2361            assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
2362        } else {
2363            unreachable!()
2364        };
2365        //col12->year: INT32 UNCOMPRESSED DO:0 FPO:602041 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2009, max: 2010, num_nulls: 0]
2366        assert!(&column_index[0][11].is_sorted());
2367        if let ColumnIndexMetaData::INT32(index) = &column_index[0][11] {
2368            check_native_page_index(
2369                index,
2370                325,
2371                get_row_group_min_max_bytes(row_group_metadata, 11),
2372                BoundaryOrder::ASCENDING,
2373            );
2374            assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
2375        } else {
2376            unreachable!()
2377        };
2378        //col13->month: INT32 UNCOMPRESSED DO:0 FPO:639366 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 12, num_nulls: 0]
2379        assert!(!&column_index[0][12].is_sorted());
2380        if let ColumnIndexMetaData::INT32(index) = &column_index[0][12] {
2381            check_native_page_index(
2382                index,
2383                325,
2384                get_row_group_min_max_bytes(row_group_metadata, 12),
2385                BoundaryOrder::UNORDERED,
2386            );
2387            assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
2388        } else {
2389            unreachable!()
2390        };
2391    }
2392
2393    fn check_native_page_index<T: ParquetValueType>(
2394        row_group_index: &PrimitiveColumnIndex<T>,
2395        page_size: usize,
2396        min_max: (&[u8], &[u8]),
2397        boundary_order: BoundaryOrder,
2398    ) {
2399        assert_eq!(row_group_index.num_pages() as usize, page_size);
2400        assert_eq!(row_group_index.boundary_order, boundary_order);
2401        assert!(row_group_index.min_values().iter().all(|x| {
2402            x >= &T::try_from_le_slice(min_max.0).unwrap()
2403                && x <= &T::try_from_le_slice(min_max.1).unwrap()
2404        }));
2405    }
2406
2407    fn check_byte_array_page_index(
2408        row_group_index: &ByteArrayColumnIndex,
2409        page_size: usize,
2410        min_max: (&[u8], &[u8]),
2411        boundary_order: BoundaryOrder,
2412    ) {
2413        assert_eq!(row_group_index.num_pages() as usize, page_size);
2414        assert_eq!(row_group_index.boundary_order, boundary_order);
2415        for i in 0..row_group_index.num_pages() as usize {
2416            let x = row_group_index.min_value(i).unwrap();
2417            assert!(x >= min_max.0 && x <= min_max.1);
2418        }
2419    }
2420
2421    fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
2422        let statistics = r.column(col_num).statistics().unwrap();
2423        (
2424            statistics.min_bytes_opt().unwrap_or_default(),
2425            statistics.max_bytes_opt().unwrap_or_default(),
2426        )
2427    }
2428
2429    #[test]
2430    fn test_skip_next_page_with_dictionary_page() {
2431        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2432        let builder = ReadOptionsBuilder::new();
2433        // enable read page index
2434        let options = builder.with_page_index().build();
2435        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2436        let reader = reader_result.unwrap();
2437
2438        let row_group_reader = reader.get_row_group(0).unwrap();
2439
2440        // use 'string_col', Boundary order: UNORDERED, total 352 data pages and 1 dictionary page.
2441        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2442
2443        let mut vec = vec![];
2444
2445        // Step 1: Peek and ensure dictionary page is correctly identified
2446        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2447        assert!(meta.is_dict);
2448
2449        // Step 2: Call skip_next_page to skip the dictionary page
2450        column_page_reader.skip_next_page().unwrap();
2451
2452        // Step 3: Read the next data page after skipping the dictionary page
2453        let page = column_page_reader.get_next_page().unwrap().unwrap();
2454        assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2455
2456        // Step 4: Continue reading remaining data pages and verify correctness
2457        for _i in 0..351 {
2458            // 352 total pages, 1 dictionary page is skipped
2459            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2460            assert!(!meta.is_dict); // Verify no dictionary page here
2461            vec.push(meta);
2462
2463            let page = column_page_reader.get_next_page().unwrap().unwrap();
2464            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2465        }
2466
2467        // Step 5: Check if all pages are read
2468        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2469        assert!(column_page_reader.get_next_page().unwrap().is_none());
2470
2471        // Step 6: Verify the number of data pages read (should be 351 data pages)
2472        assert_eq!(vec.len(), 351);
2473    }
2474
2475    #[test]
2476    fn test_skip_page_with_offset_index() {
2477        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2478        let builder = ReadOptionsBuilder::new();
2479        //enable read page index
2480        let options = builder.with_page_index().build();
2481        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2482        let reader = reader_result.unwrap();
2483
2484        let row_group_reader = reader.get_row_group(0).unwrap();
2485
2486        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
2487        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2488
2489        let mut vec = vec![];
2490
2491        for i in 0..325 {
2492            if i % 2 == 0 {
2493                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2494            } else {
2495                column_page_reader.skip_next_page().unwrap();
2496            }
2497        }
2498        //check read all pages.
2499        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2500        assert!(column_page_reader.get_next_page().unwrap().is_none());
2501
2502        assert_eq!(vec.len(), 163);
2503    }
2504
2505    #[test]
2506    fn test_skip_page_without_offset_index() {
2507        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2508
2509        // use default SerializedFileReader without read offsetIndex
2510        let reader_result = SerializedFileReader::new(test_file);
2511        let reader = reader_result.unwrap();
2512
2513        let row_group_reader = reader.get_row_group(0).unwrap();
2514
2515        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
2516        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2517
2518        let mut vec = vec![];
2519
2520        for i in 0..325 {
2521            if i % 2 == 0 {
2522                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2523            } else {
2524                column_page_reader.peek_next_page().unwrap().unwrap();
2525                column_page_reader.skip_next_page().unwrap();
2526            }
2527        }
2528        //check read all pages.
2529        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2530        assert!(column_page_reader.get_next_page().unwrap().is_none());
2531
2532        assert_eq!(vec.len(), 163);
2533    }
2534
2535    #[test]
2536    fn test_peek_page_with_dictionary_page() {
2537        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2538        let builder = ReadOptionsBuilder::new();
2539        //enable read page index
2540        let options = builder.with_page_index().build();
2541        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2542        let reader = reader_result.unwrap();
2543        let row_group_reader = reader.get_row_group(0).unwrap();
2544
2545        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
2546        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2547
2548        let mut vec = vec![];
2549
2550        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2551        assert!(meta.is_dict);
2552        let page = column_page_reader.get_next_page().unwrap().unwrap();
2553        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2554
2555        for i in 0..352 {
2556            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2557            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
2558            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
2559            if i != 351 {
2560                assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
2561            } else {
2562                // last page first row index is 7290, total row count is 7300
2563                // because first row start with zero, last page row count should be 10.
2564                assert_eq!(meta.num_rows, Some(10));
2565            }
2566            assert!(!meta.is_dict);
2567            vec.push(meta);
2568            let page = column_page_reader.get_next_page().unwrap().unwrap();
2569            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2570        }
2571
2572        //check read all pages.
2573        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2574        assert!(column_page_reader.get_next_page().unwrap().is_none());
2575
2576        assert_eq!(vec.len(), 352);
2577    }
2578
2579    #[test]
2580    fn test_peek_page_with_dictionary_page_without_offset_index() {
2581        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2582
2583        let reader_result = SerializedFileReader::new(test_file);
2584        let reader = reader_result.unwrap();
2585        let row_group_reader = reader.get_row_group(0).unwrap();
2586
2587        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
2588        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2589
2590        let mut vec = vec![];
2591
2592        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2593        assert!(meta.is_dict);
2594        let page = column_page_reader.get_next_page().unwrap().unwrap();
2595        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2596
2597        for i in 0..352 {
2598            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2599            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
2600            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
2601            if i != 351 {
2602                assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2603            } else {
2604                // last page first row index is 7290, total row count is 7300
2605                // because first row start with zero, last page row count should be 10.
2606                assert_eq!(meta.num_levels, Some(10));
2607            }
2608            assert!(!meta.is_dict);
2609            vec.push(meta);
2610            let page = column_page_reader.get_next_page().unwrap().unwrap();
2611            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2612        }
2613
2614        //check read all pages.
2615        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2616        assert!(column_page_reader.get_next_page().unwrap().is_none());
2617
2618        assert_eq!(vec.len(), 352);
2619    }
2620
2621    #[test]
2622    fn test_fixed_length_index() {
2623        let message_type = "
2624        message test_schema {
2625          OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2626        }
2627        ";
2628
2629        let schema = parse_message_type(message_type).unwrap();
2630        let mut out = Vec::with_capacity(1024);
2631        let mut writer =
2632            SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2633
2634        let mut r = writer.next_row_group().unwrap();
2635        let mut c = r.next_column().unwrap().unwrap();
2636        c.typed::<FixedLenByteArrayType>()
2637            .write_batch(
2638                &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2639                Some(&[1, 1, 0, 1]),
2640                None,
2641            )
2642            .unwrap();
2643        c.close().unwrap();
2644        r.close().unwrap();
2645        writer.close().unwrap();
2646
2647        let b = Bytes::from(out);
2648        let options = ReadOptionsBuilder::new().with_page_index().build();
2649        let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2650        let index = reader.metadata().column_index().unwrap();
2651
2652        // 1 row group
2653        assert_eq!(index.len(), 1);
2654        let c = &index[0];
2655        // 1 column
2656        assert_eq!(c.len(), 1);
2657
2658        match &c[0] {
2659            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(v) => {
2660                assert_eq!(v.num_pages(), 1);
2661                assert_eq!(v.null_count(0).unwrap(), 1);
2662                assert_eq!(v.min_value(0).unwrap(), &[0; 11]);
2663                assert_eq!(v.max_value(0).unwrap(), &[5; 11]);
2664            }
2665            _ => unreachable!(),
2666        }
2667    }
2668
2669    #[test]
2670    fn test_multi_gz() {
2671        let file = get_test_file("concatenated_gzip_members.parquet");
2672        let reader = SerializedFileReader::new(file).unwrap();
2673        let row_group_reader = reader.get_row_group(0).unwrap();
2674        match row_group_reader.get_column_reader(0).unwrap() {
2675            ColumnReader::Int64ColumnReader(mut reader) => {
2676                let mut buffer = Vec::with_capacity(1024);
2677                let mut def_levels = Vec::with_capacity(1024);
2678                let (num_records, num_values, num_levels) = reader
2679                    .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2680                    .unwrap();
2681
2682                assert_eq!(num_records, 513);
2683                assert_eq!(num_values, 513);
2684                assert_eq!(num_levels, 513);
2685
2686                let expected: Vec<i64> = (1..514).collect();
2687                assert_eq!(&buffer, &expected);
2688            }
2689            _ => unreachable!(),
2690        }
2691    }
2692
2693    #[test]
2694    fn test_byte_stream_split_extended() {
2695        let path = format!(
2696            "{}/byte_stream_split_extended.gzip.parquet",
2697            arrow::util::test_util::parquet_test_data(),
2698        );
2699        let file = File::open(path).unwrap();
2700        let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2701
2702        // Use full schema as projected schema
2703        let mut iter = reader
2704            .get_row_iter(None)
2705            .expect("Failed to create row iterator");
2706
2707        let mut start = 0;
2708        let end = reader.metadata().file_metadata().num_rows();
2709
2710        let check_row = |row: Result<Row, ParquetError>| {
2711            assert!(row.is_ok());
2712            let r = row.unwrap();
2713            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2714            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2715            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2716            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2717            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2718            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2719            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2720        };
2721
2722        while start < end {
2723            match iter.next() {
2724                Some(row) => check_row(row),
2725                None => break,
2726            };
2727            start += 1;
2728        }
2729    }
2730
2731    #[test]
2732    fn test_filtered_rowgroup_metadata() {
2733        let message_type = "
2734            message test_schema {
2735                REQUIRED INT32 a;
2736            }
2737        ";
2738        let schema = Arc::new(parse_message_type(message_type).unwrap());
2739        let props = Arc::new(
2740            WriterProperties::builder()
2741                .set_statistics_enabled(EnabledStatistics::Page)
2742                .build(),
2743        );
2744        let mut file: File = tempfile::tempfile().unwrap();
2745        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2746        let data = [1, 2, 3, 4, 5];
2747
2748        // write 5 row groups
2749        for idx in 0..5 {
2750            let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2751            let mut row_group_writer = file_writer.next_row_group().unwrap();
2752            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2753                writer
2754                    .typed::<Int32Type>()
2755                    .write_batch(data_i.as_slice(), None, None)
2756                    .unwrap();
2757                writer.close().unwrap();
2758            }
2759            row_group_writer.close().unwrap();
2760            file_writer.flushed_row_groups();
2761        }
2762        let file_metadata = file_writer.close().unwrap();
2763
2764        assert_eq!(file_metadata.file_metadata().num_rows(), 25);
2765        assert_eq!(file_metadata.num_row_groups(), 5);
2766
2767        // read only the 3rd row group
2768        let read_options = ReadOptionsBuilder::new()
2769            .with_page_index()
2770            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2771            .build();
2772        let reader =
2773            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2774                .unwrap();
2775        let metadata = reader.metadata();
2776
2777        // check we got the expected row group
2778        assert_eq!(metadata.num_row_groups(), 1);
2779        assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2780
2781        // check we only got the relevant page indexes
2782        assert!(metadata.column_index().is_some());
2783        assert!(metadata.offset_index().is_some());
2784        assert_eq!(metadata.column_index().unwrap().len(), 1);
2785        assert_eq!(metadata.offset_index().unwrap().len(), 1);
2786        let col_idx = metadata.column_index().unwrap();
2787        let off_idx = metadata.offset_index().unwrap();
2788        let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2789        let pg_idx = &col_idx[0][0];
2790        let off_idx_i = &off_idx[0][0];
2791
2792        // test that we got the index matching the row group
2793        match pg_idx {
2794            ColumnIndexMetaData::INT32(int_idx) => {
2795                let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2796                let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2797                assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2798                assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2799            }
2800            _ => panic!("wrong stats type"),
2801        }
2802
2803        // check offset index matches too
2804        assert_eq!(
2805            off_idx_i.page_locations[0].offset,
2806            metadata.row_group(0).column(0).data_page_offset()
2807        );
2808
2809        // read non-contiguous row groups
2810        let read_options = ReadOptionsBuilder::new()
2811            .with_page_index()
2812            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2813            .build();
2814        let reader =
2815            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2816                .unwrap();
2817        let metadata = reader.metadata();
2818
2819        // check we got the expected row groups
2820        assert_eq!(metadata.num_row_groups(), 2);
2821        assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2822        assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2823
2824        // check we only got the relevant page indexes
2825        assert!(metadata.column_index().is_some());
2826        assert!(metadata.offset_index().is_some());
2827        assert_eq!(metadata.column_index().unwrap().len(), 2);
2828        assert_eq!(metadata.offset_index().unwrap().len(), 2);
2829        let col_idx = metadata.column_index().unwrap();
2830        let off_idx = metadata.offset_index().unwrap();
2831
2832        for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2833            let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2834            let pg_idx = &col_idx_i[0];
2835            let off_idx_i = &off_idx[i][0];
2836
2837            // test that we got the index matching the row group
2838            match pg_idx {
2839                ColumnIndexMetaData::INT32(int_idx) => {
2840                    let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2841                    let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2842                    assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2843                    assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2844                }
2845                _ => panic!("wrong stats type"),
2846            }
2847
2848            // check offset index matches too
2849            assert_eq!(
2850                off_idx_i.page_locations[0].offset,
2851                metadata.row_group(i).column(0).data_page_offset()
2852            );
2853        }
2854    }
2855
2856    #[test]
2857    fn test_reuse_schema() {
2858        let file = get_test_file("alltypes_plain.parquet");
2859        let file_reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
2860        let schema = file_reader.metadata().file_metadata().schema_descr_ptr();
2861        let expected = file_reader.metadata;
2862
2863        let options = ReadOptionsBuilder::new()
2864            .with_parquet_schema(schema)
2865            .build();
2866        let file_reader = SerializedFileReader::new_with_options(file, options).unwrap();
2867
2868        assert_eq!(expected.as_ref(), file_reader.metadata.as_ref());
2869        // Should have used the same schema instance
2870        assert!(Arc::ptr_eq(
2871            &expected.file_metadata().schema_descr_ptr(),
2872            &file_reader.metadata.file_metadata().schema_descr_ptr()
2873        ));
2874    }
2875
2876    #[test]
2877    fn test_read_unknown_logical_type() {
2878        let file = get_test_file("unknown-logical-type.parquet");
2879        let reader = SerializedFileReader::new(file).expect("Error opening file");
2880
2881        let schema = reader.metadata().file_metadata().schema_descr();
2882        assert_eq!(
2883            schema.column(0).logical_type_ref(),
2884            Some(&basic::LogicalType::String)
2885        );
2886        assert_eq!(
2887            schema.column(1).logical_type_ref(),
2888            Some(&basic::LogicalType::_Unknown { field_id: 2555 })
2889        );
2890        assert_eq!(schema.column(1).physical_type(), Type::BYTE_ARRAY);
2891
2892        let mut iter = reader
2893            .get_row_iter(None)
2894            .expect("Failed to create row iterator");
2895
2896        let mut num_rows = 0;
2897        while iter.next().is_some() {
2898            num_rows += 1;
2899        }
2900        assert_eq!(num_rows, reader.metadata().file_metadata().num_rows());
2901    }
2902}