parquet/file/
serialized_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader
19//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM)
20
21use crate::basic::{PageType, Type};
22use crate::bloom_filter::Sbbf;
23use crate::column::page::{Page, PageMetadata, PageReader};
24use crate::compression::{Codec, create_codec};
25#[cfg(feature = "encryption")]
26use crate::encryption::decrypt::{CryptoContext, read_and_decrypt};
27use crate::errors::{ParquetError, Result};
28use crate::file::metadata::thrift::PageHeader;
29use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
30use crate::file::statistics;
31use crate::file::{
32    metadata::*,
33    properties::{ReaderProperties, ReaderPropertiesPtr},
34    reader::*,
35};
36#[cfg(feature = "encryption")]
37use crate::parquet_thrift::ThriftSliceInputProtocol;
38use crate::parquet_thrift::{ReadThrift, ThriftReadInputProtocol};
39use crate::record::Row;
40use crate::record::reader::RowIter;
41use crate::schema::types::{SchemaDescPtr, Type as SchemaType};
42use bytes::Bytes;
43use std::collections::VecDeque;
44use std::{fs::File, io::Read, path::Path, sync::Arc};
45
46impl TryFrom<File> for SerializedFileReader<File> {
47    type Error = ParquetError;
48
49    fn try_from(file: File) -> Result<Self> {
50        Self::new(file)
51    }
52}
53
54impl TryFrom<&Path> for SerializedFileReader<File> {
55    type Error = ParquetError;
56
57    fn try_from(path: &Path) -> Result<Self> {
58        let file = File::open(path)?;
59        Self::try_from(file)
60    }
61}
62
63impl TryFrom<String> for SerializedFileReader<File> {
64    type Error = ParquetError;
65
66    fn try_from(path: String) -> Result<Self> {
67        Self::try_from(Path::new(&path))
68    }
69}
70
71impl TryFrom<&str> for SerializedFileReader<File> {
72    type Error = ParquetError;
73
74    fn try_from(path: &str) -> Result<Self> {
75        Self::try_from(Path::new(&path))
76    }
77}
78
79/// Conversion into a [`RowIter`]
80/// using the full file schema over all row groups.
81impl IntoIterator for SerializedFileReader<File> {
82    type Item = Result<Row>;
83    type IntoIter = RowIter<'static>;
84
85    fn into_iter(self) -> Self::IntoIter {
86        RowIter::from_file_into(Box::new(self))
87    }
88}
89
90// ----------------------------------------------------------------------
91// Implementations of file & row group readers
92
93/// A serialized implementation for Parquet [`FileReader`].
94pub struct SerializedFileReader<R: ChunkReader> {
95    chunk_reader: Arc<R>,
96    metadata: Arc<ParquetMetaData>,
97    props: ReaderPropertiesPtr,
98}
99
100/// A predicate for filtering row groups, invoked with the metadata and index
101/// of each row group in the file. Only row groups for which the predicate
102/// evaluates to `true` will be scanned
103pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
104
105/// A builder for [`ReadOptions`].
106/// For the predicates that are added to the builder,
107/// they will be chained using 'AND' to filter the row groups.
108#[derive(Default)]
109pub struct ReadOptionsBuilder {
110    predicates: Vec<ReadGroupPredicate>,
111    enable_page_index: bool,
112    props: Option<ReaderProperties>,
113    metadata_options: ParquetMetaDataOptions,
114}
115
116impl ReadOptionsBuilder {
117    /// New builder
118    pub fn new() -> Self {
119        Self::default()
120    }
121
122    /// Add a predicate on row group metadata to the reading option,
123    /// Filter only row groups that match the predicate criteria
124    pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
125        self.predicates.push(predicate);
126        self
127    }
128
129    /// Add a range predicate on filtering row groups if their midpoints are within
130    /// the Closed-Open range `[start..end) {x | start <= x < end}`
131    pub fn with_range(mut self, start: i64, end: i64) -> Self {
132        assert!(start < end);
133        let predicate = move |rg: &RowGroupMetaData, _: usize| {
134            let mid = get_midpoint_offset(rg);
135            mid >= start && mid < end
136        };
137        self.predicates.push(Box::new(predicate));
138        self
139    }
140
141    /// Enable reading the page index structures described in
142    /// "[Column Index] Layout to Support Page Skipping"
143    ///
144    /// [Column Index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
145    pub fn with_page_index(mut self) -> Self {
146        self.enable_page_index = true;
147        self
148    }
149
150    /// Set the [`ReaderProperties`] configuration.
151    pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
152        self.props = Some(properties);
153        self
154    }
155
156    /// Provide a Parquet schema to use when decoding the metadata. The schema in the Parquet
157    /// footer will be skipped.
158    pub fn with_parquet_schema(mut self, schema: SchemaDescPtr) -> Self {
159        self.metadata_options.set_schema(schema);
160        self
161    }
162
163    /// Seal the builder and return the read options
164    pub fn build(self) -> ReadOptions {
165        let props = self
166            .props
167            .unwrap_or_else(|| ReaderProperties::builder().build());
168        ReadOptions {
169            predicates: self.predicates,
170            enable_page_index: self.enable_page_index,
171            props,
172            metadata_options: self.metadata_options,
173        }
174    }
175}
176
177/// A collection of options for reading a Parquet file.
178///
179/// Predicates are currently only supported on row group metadata.
180/// All predicates will be chained using 'AND' to filter the row groups.
181pub struct ReadOptions {
182    predicates: Vec<ReadGroupPredicate>,
183    enable_page_index: bool,
184    props: ReaderProperties,
185    metadata_options: ParquetMetaDataOptions,
186}
187
188impl<R: 'static + ChunkReader> SerializedFileReader<R> {
189    /// Creates file reader from a Parquet file.
190    /// Returns an error if the Parquet file does not exist or is corrupt.
191    pub fn new(chunk_reader: R) -> Result<Self> {
192        let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
193        let props = Arc::new(ReaderProperties::builder().build());
194        Ok(Self {
195            chunk_reader: Arc::new(chunk_reader),
196            metadata: Arc::new(metadata),
197            props,
198        })
199    }
200
201    /// Creates file reader from a Parquet file with read options.
202    /// Returns an error if the Parquet file does not exist or is corrupt.
203    #[allow(deprecated)]
204    pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
205        let mut metadata_builder = ParquetMetaDataReader::new()
206            .with_metadata_options(Some(options.metadata_options.clone()))
207            .parse_and_finish(&chunk_reader)?
208            .into_builder();
209        let mut predicates = options.predicates;
210
211        // Filter row groups based on the predicates
212        for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
213            let mut keep = true;
214            for predicate in &mut predicates {
215                if !predicate(&rg_meta, i) {
216                    keep = false;
217                    break;
218                }
219            }
220            if keep {
221                metadata_builder = metadata_builder.add_row_group(rg_meta);
222            }
223        }
224
225        let mut metadata = metadata_builder.build();
226
227        // If page indexes are desired, build them with the filtered set of row groups
228        if options.enable_page_index {
229            let mut reader =
230                ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
231            reader.read_page_indexes(&chunk_reader)?;
232            metadata = reader.finish()?;
233        }
234
235        Ok(Self {
236            chunk_reader: Arc::new(chunk_reader),
237            metadata: Arc::new(metadata),
238            props: Arc::new(options.props),
239        })
240    }
241}
242
243/// Get midpoint offset for a row group
244fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
245    let col = meta.column(0);
246    let mut offset = col.data_page_offset();
247    if let Some(dic_offset) = col.dictionary_page_offset() {
248        if offset > dic_offset {
249            offset = dic_offset
250        }
251    };
252    offset + meta.compressed_size() / 2
253}
254
255impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
256    fn metadata(&self) -> &ParquetMetaData {
257        &self.metadata
258    }
259
260    fn num_row_groups(&self) -> usize {
261        self.metadata.num_row_groups()
262    }
263
264    fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
265        let row_group_metadata = self.metadata.row_group(i);
266        // Row groups should be processed sequentially.
267        let props = Arc::clone(&self.props);
268        let f = Arc::clone(&self.chunk_reader);
269        Ok(Box::new(SerializedRowGroupReader::new(
270            f,
271            row_group_metadata,
272            self.metadata.offset_index().map(|x| x[i].as_slice()),
273            props,
274        )?))
275    }
276
277    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
278        RowIter::from_file(projection, self)
279    }
280}
281
282/// A serialized implementation for Parquet [`RowGroupReader`].
283pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
284    chunk_reader: Arc<R>,
285    metadata: &'a RowGroupMetaData,
286    offset_index: Option<&'a [OffsetIndexMetaData]>,
287    props: ReaderPropertiesPtr,
288    bloom_filters: Vec<Option<Sbbf>>,
289}
290
291impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
292    /// Creates new row group reader from a file, row group metadata and custom config.
293    pub fn new(
294        chunk_reader: Arc<R>,
295        metadata: &'a RowGroupMetaData,
296        offset_index: Option<&'a [OffsetIndexMetaData]>,
297        props: ReaderPropertiesPtr,
298    ) -> Result<Self> {
299        let bloom_filters = if props.read_bloom_filter() {
300            metadata
301                .columns()
302                .iter()
303                .map(|col| Sbbf::read_from_column_chunk(col, &*chunk_reader))
304                .collect::<Result<Vec<_>>>()?
305        } else {
306            std::iter::repeat_n(None, metadata.columns().len()).collect()
307        };
308        Ok(Self {
309            chunk_reader,
310            metadata,
311            offset_index,
312            props,
313            bloom_filters,
314        })
315    }
316}
317
318impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
319    fn metadata(&self) -> &RowGroupMetaData {
320        self.metadata
321    }
322
323    fn num_columns(&self) -> usize {
324        self.metadata.num_columns()
325    }
326
327    // TODO: fix PARQUET-816
328    fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
329        let col = self.metadata.column(i);
330
331        let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
332
333        let props = Arc::clone(&self.props);
334        Ok(Box::new(SerializedPageReader::new_with_properties(
335            Arc::clone(&self.chunk_reader),
336            col,
337            usize::try_from(self.metadata.num_rows())?,
338            page_locations,
339            props,
340        )?))
341    }
342
343    /// get bloom filter for the `i`th column
344    fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
345        self.bloom_filters[i].as_ref()
346    }
347
348    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>> {
349        RowIter::from_row_group(projection, self)
350    }
351}
352
353/// Decodes a [`Page`] from the provided `buffer`
354pub(crate) fn decode_page(
355    page_header: PageHeader,
356    buffer: Bytes,
357    physical_type: Type,
358    decompressor: Option<&mut Box<dyn Codec>>,
359) -> Result<Page> {
360    // Verify the 32-bit CRC checksum of the page
361    #[cfg(feature = "crc")]
362    if let Some(expected_crc) = page_header.crc {
363        let crc = crc32fast::hash(&buffer);
364        if crc != expected_crc as u32 {
365            return Err(general_err!("Page CRC checksum mismatch"));
366        }
367    }
368
369    // When processing data page v2, depending on enabled compression for the
370    // page, we should account for uncompressed data ('offset') of
371    // repetition and definition levels.
372    //
373    // We always use 0 offset for other pages other than v2, `true` flag means
374    // that compression will be applied if decompressor is defined
375    let mut offset: usize = 0;
376    let mut can_decompress = true;
377
378    if let Some(ref header_v2) = page_header.data_page_header_v2 {
379        if header_v2.definition_levels_byte_length < 0
380            || header_v2.repetition_levels_byte_length < 0
381            || header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length
382                > page_header.uncompressed_page_size
383        {
384            return Err(general_err!(
385                "DataPage v2 header contains implausible values \
386                    for definition_levels_byte_length ({}) \
387                    and repetition_levels_byte_length ({}) \
388                    given DataPage header provides uncompressed_page_size ({})",
389                header_v2.definition_levels_byte_length,
390                header_v2.repetition_levels_byte_length,
391                page_header.uncompressed_page_size
392            ));
393        }
394        offset = usize::try_from(
395            header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length,
396        )?;
397        // When is_compressed flag is missing the page is considered compressed
398        can_decompress = header_v2.is_compressed.unwrap_or(true);
399    }
400
401    let buffer = match decompressor {
402        Some(decompressor) if can_decompress => {
403            let uncompressed_page_size = usize::try_from(page_header.uncompressed_page_size)?;
404            if offset > buffer.len() || offset > uncompressed_page_size {
405                return Err(general_err!("Invalid page header"));
406            }
407            let decompressed_size = uncompressed_page_size - offset;
408            let mut decompressed = Vec::with_capacity(uncompressed_page_size);
409            decompressed.extend_from_slice(&buffer[..offset]);
410            // decompressed size of zero corresponds to a page with no non-null values
411            // see https://github.com/apache/parquet-format/blob/master/README.md#data-pages
412            if decompressed_size > 0 {
413                let compressed = &buffer[offset..];
414                decompressor.decompress(compressed, &mut decompressed, Some(decompressed_size))?;
415            }
416
417            if decompressed.len() != uncompressed_page_size {
418                return Err(general_err!(
419                    "Actual decompressed size doesn't match the expected one ({} vs {})",
420                    decompressed.len(),
421                    uncompressed_page_size
422                ));
423            }
424
425            Bytes::from(decompressed)
426        }
427        _ => buffer,
428    };
429
430    let result = match page_header.r#type {
431        PageType::DICTIONARY_PAGE => {
432            let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
433                ParquetError::General("Missing dictionary page header".to_string())
434            })?;
435            let is_sorted = dict_header.is_sorted.unwrap_or(false);
436            Page::DictionaryPage {
437                buf: buffer,
438                num_values: dict_header.num_values.try_into()?,
439                encoding: dict_header.encoding,
440                is_sorted,
441            }
442        }
443        PageType::DATA_PAGE => {
444            let header = page_header
445                .data_page_header
446                .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
447            Page::DataPage {
448                buf: buffer,
449                num_values: header.num_values.try_into()?,
450                encoding: header.encoding,
451                def_level_encoding: header.definition_level_encoding,
452                rep_level_encoding: header.repetition_level_encoding,
453                statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
454            }
455        }
456        PageType::DATA_PAGE_V2 => {
457            let header = page_header
458                .data_page_header_v2
459                .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
460            let is_compressed = header.is_compressed.unwrap_or(true);
461            Page::DataPageV2 {
462                buf: buffer,
463                num_values: header.num_values.try_into()?,
464                encoding: header.encoding,
465                num_nulls: header.num_nulls.try_into()?,
466                num_rows: header.num_rows.try_into()?,
467                def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
468                rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
469                is_compressed,
470                statistics: statistics::from_thrift_page_stats(physical_type, header.statistics)?,
471            }
472        }
473        _ => {
474            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
475            return Err(general_err!(
476                "Page type {:?} is not supported",
477                page_header.r#type
478            ));
479        }
480    };
481
482    Ok(result)
483}
484
485enum SerializedPageReaderState {
486    Values {
487        /// The current byte offset in the reader
488        /// Note that offset is u64 (i.e., not usize) to support 32-bit architectures such as WASM
489        offset: u64,
490
491        /// The length of the chunk in bytes
492        /// Note that remaining_bytes is u64 (i.e., not usize) to support 32-bit architectures such as WASM
493        remaining_bytes: u64,
494
495        // If the next page header has already been "peeked", we will cache it and it`s length here
496        next_page_header: Option<Box<PageHeader>>,
497
498        /// The index of the data page within this column chunk
499        page_index: usize,
500
501        /// Whether the next page is expected to be a dictionary page
502        require_dictionary: bool,
503    },
504    Pages {
505        /// Remaining page locations
506        page_locations: VecDeque<PageLocation>,
507        /// Remaining dictionary location if any
508        dictionary_page: Option<PageLocation>,
509        /// The total number of rows in this column chunk
510        total_rows: usize,
511        /// The index of the data page within this column chunk
512        page_index: usize,
513    },
514}
515
516#[derive(Default)]
517struct SerializedPageReaderContext {
518    /// Controls decoding of page-level statistics
519    read_stats: bool,
520    /// Crypto context carrying objects required for decryption
521    #[cfg(feature = "encryption")]
522    crypto_context: Option<Arc<CryptoContext>>,
523}
524
525/// A serialized implementation for Parquet [`PageReader`].
526pub struct SerializedPageReader<R: ChunkReader> {
527    /// The chunk reader
528    reader: Arc<R>,
529
530    /// The compression codec for this column chunk. Only set for non-PLAIN codec.
531    decompressor: Option<Box<dyn Codec>>,
532
533    /// Column chunk type.
534    physical_type: Type,
535
536    state: SerializedPageReaderState,
537
538    context: SerializedPageReaderContext,
539}
540
541impl<R: ChunkReader> SerializedPageReader<R> {
542    /// Creates a new serialized page reader from a chunk reader and metadata
543    pub fn new(
544        reader: Arc<R>,
545        column_chunk_metadata: &ColumnChunkMetaData,
546        total_rows: usize,
547        page_locations: Option<Vec<PageLocation>>,
548    ) -> Result<Self> {
549        let props = Arc::new(ReaderProperties::builder().build());
550        SerializedPageReader::new_with_properties(
551            reader,
552            column_chunk_metadata,
553            total_rows,
554            page_locations,
555            props,
556        )
557    }
558
559    /// Stub No-op implementation when encryption is disabled.
560    #[cfg(all(feature = "arrow", not(feature = "encryption")))]
561    pub(crate) fn add_crypto_context(
562        self,
563        _rg_idx: usize,
564        _column_idx: usize,
565        _parquet_meta_data: &ParquetMetaData,
566        _column_chunk_metadata: &ColumnChunkMetaData,
567    ) -> Result<SerializedPageReader<R>> {
568        Ok(self)
569    }
570
571    /// Adds any necessary crypto context to this page reader, if encryption is enabled.
572    #[cfg(feature = "encryption")]
573    pub(crate) fn add_crypto_context(
574        mut self,
575        rg_idx: usize,
576        column_idx: usize,
577        parquet_meta_data: &ParquetMetaData,
578        column_chunk_metadata: &ColumnChunkMetaData,
579    ) -> Result<SerializedPageReader<R>> {
580        let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
581            return Ok(self);
582        };
583        let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() else {
584            return Ok(self);
585        };
586        let crypto_context =
587            CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, column_idx)?;
588        self.context.crypto_context = Some(Arc::new(crypto_context));
589        Ok(self)
590    }
591
592    /// Creates a new serialized page with custom options.
593    pub fn new_with_properties(
594        reader: Arc<R>,
595        meta: &ColumnChunkMetaData,
596        total_rows: usize,
597        page_locations: Option<Vec<PageLocation>>,
598        props: ReaderPropertiesPtr,
599    ) -> Result<Self> {
600        let decompressor = create_codec(meta.compression(), props.codec_options())?;
601        let (start, len) = meta.byte_range();
602
603        let state = match page_locations {
604            Some(locations) => {
605                // If the offset of the first page doesn't match the start of the column chunk
606                // then the preceding space must contain a dictionary page.
607                let dictionary_page = match locations.first() {
608                    Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
609                        offset: start as i64,
610                        compressed_page_size: (dict_offset.offset as u64 - start) as i32,
611                        first_row_index: 0,
612                    }),
613                    _ => None,
614                };
615
616                SerializedPageReaderState::Pages {
617                    page_locations: locations.into(),
618                    dictionary_page,
619                    total_rows,
620                    page_index: 0,
621                }
622            }
623            None => SerializedPageReaderState::Values {
624                offset: start,
625                remaining_bytes: len,
626                next_page_header: None,
627                page_index: 0,
628                require_dictionary: meta.dictionary_page_offset().is_some(),
629            },
630        };
631        let mut context = SerializedPageReaderContext::default();
632        if props.read_page_stats() {
633            context.read_stats = true;
634        }
635        Ok(Self {
636            reader,
637            decompressor,
638            state,
639            physical_type: meta.column_type(),
640            context,
641        })
642    }
643
644    /// Similar to `peek_next_page`, but returns the offset of the next page instead of the page metadata.
645    /// Unlike page metadata, an offset can uniquely identify a page.
646    ///
647    /// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice.
648    /// This function allows us to check if the next page is being cached or read previously.
649    #[cfg(test)]
650    fn peek_next_page_offset(&mut self) -> Result<Option<u64>> {
651        match &mut self.state {
652            SerializedPageReaderState::Values {
653                offset,
654                remaining_bytes,
655                next_page_header,
656                page_index,
657                require_dictionary,
658            } => {
659                loop {
660                    if *remaining_bytes == 0 {
661                        return Ok(None);
662                    }
663                    return if let Some(header) = next_page_header.as_ref() {
664                        if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
665                            Ok(Some(*offset))
666                        } else {
667                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
668                            *next_page_header = None;
669                            continue;
670                        }
671                    } else {
672                        let mut read = self.reader.get_read(*offset)?;
673                        let (header_len, header) = Self::read_page_header_len(
674                            &self.context,
675                            &mut read,
676                            *page_index,
677                            *require_dictionary,
678                        )?;
679                        *offset += header_len as u64;
680                        *remaining_bytes -= header_len as u64;
681                        let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
682                            Ok(Some(*offset))
683                        } else {
684                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
685                            continue;
686                        };
687                        *next_page_header = Some(Box::new(header));
688                        page_meta
689                    };
690                }
691            }
692            SerializedPageReaderState::Pages {
693                page_locations,
694                dictionary_page,
695                ..
696            } => {
697                if let Some(page) = dictionary_page {
698                    Ok(Some(page.offset as u64))
699                } else if let Some(page) = page_locations.front() {
700                    Ok(Some(page.offset as u64))
701                } else {
702                    Ok(None)
703                }
704            }
705        }
706    }
707
708    fn read_page_header_len<T: Read>(
709        context: &SerializedPageReaderContext,
710        input: &mut T,
711        page_index: usize,
712        dictionary_page: bool,
713    ) -> Result<(usize, PageHeader)> {
714        /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
715        struct TrackedRead<R> {
716            inner: R,
717            bytes_read: usize,
718        }
719
720        impl<R: Read> Read for TrackedRead<R> {
721            fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
722                let v = self.inner.read(buf)?;
723                self.bytes_read += v;
724                Ok(v)
725            }
726        }
727
728        let mut tracked = TrackedRead {
729            inner: input,
730            bytes_read: 0,
731        };
732        let header = context.read_page_header(&mut tracked, page_index, dictionary_page)?;
733        Ok((tracked.bytes_read, header))
734    }
735
736    fn read_page_header_len_from_bytes(
737        context: &SerializedPageReaderContext,
738        buffer: &[u8],
739        page_index: usize,
740        dictionary_page: bool,
741    ) -> Result<(usize, PageHeader)> {
742        let mut input = std::io::Cursor::new(buffer);
743        let header = context.read_page_header(&mut input, page_index, dictionary_page)?;
744        let header_len = input.position() as usize;
745        Ok((header_len, header))
746    }
747}
748
749#[cfg(not(feature = "encryption"))]
750impl SerializedPageReaderContext {
751    fn read_page_header<T: Read>(
752        &self,
753        input: &mut T,
754        _page_index: usize,
755        _dictionary_page: bool,
756    ) -> Result<PageHeader> {
757        let mut prot = ThriftReadInputProtocol::new(input);
758        if self.read_stats {
759            Ok(PageHeader::read_thrift(&mut prot)?)
760        } else {
761            Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
762        }
763    }
764
765    fn decrypt_page_data<T>(
766        &self,
767        buffer: T,
768        _page_index: usize,
769        _dictionary_page: bool,
770    ) -> Result<T> {
771        Ok(buffer)
772    }
773}
774
775#[cfg(feature = "encryption")]
776impl SerializedPageReaderContext {
777    fn read_page_header<T: Read>(
778        &self,
779        input: &mut T,
780        page_index: usize,
781        dictionary_page: bool,
782    ) -> Result<PageHeader> {
783        match self.page_crypto_context(page_index, dictionary_page) {
784            None => {
785                let mut prot = ThriftReadInputProtocol::new(input);
786                if self.read_stats {
787                    Ok(PageHeader::read_thrift(&mut prot)?)
788                } else {
789                    use crate::file::metadata::thrift::PageHeader;
790
791                    Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
792                }
793            }
794            Some(page_crypto_context) => {
795                let data_decryptor = page_crypto_context.data_decryptor();
796                let aad = page_crypto_context.create_page_header_aad()?;
797
798                let buf = read_and_decrypt(data_decryptor, input, aad.as_ref()).map_err(|_| {
799                    ParquetError::General(format!(
800                        "Error decrypting page header for column {}, decryption key may be wrong",
801                        page_crypto_context.column_ordinal
802                    ))
803                })?;
804
805                let mut prot = ThriftSliceInputProtocol::new(buf.as_slice());
806                if self.read_stats {
807                    Ok(PageHeader::read_thrift(&mut prot)?)
808                } else {
809                    Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
810                }
811            }
812        }
813    }
814
815    fn decrypt_page_data<T>(&self, buffer: T, page_index: usize, dictionary_page: bool) -> Result<T>
816    where
817        T: AsRef<[u8]>,
818        T: From<Vec<u8>>,
819    {
820        let page_crypto_context = self.page_crypto_context(page_index, dictionary_page);
821        if let Some(page_crypto_context) = page_crypto_context {
822            let decryptor = page_crypto_context.data_decryptor();
823            let aad = page_crypto_context.create_page_aad()?;
824            let decrypted = decryptor.decrypt(buffer.as_ref(), &aad)?;
825            Ok(T::from(decrypted))
826        } else {
827            Ok(buffer)
828        }
829    }
830
831    fn page_crypto_context(
832        &self,
833        page_index: usize,
834        dictionary_page: bool,
835    ) -> Option<Arc<CryptoContext>> {
836        self.crypto_context.as_ref().map(|c| {
837            Arc::new(if dictionary_page {
838                c.for_dictionary_page()
839            } else {
840                c.with_page_ordinal(page_index)
841            })
842        })
843    }
844}
845
846impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
847    type Item = Result<Page>;
848
849    fn next(&mut self) -> Option<Self::Item> {
850        self.get_next_page().transpose()
851    }
852}
853
854fn verify_page_header_len(header_len: usize, remaining_bytes: u64) -> Result<()> {
855    if header_len as u64 > remaining_bytes {
856        return Err(eof_err!("Invalid page header"));
857    }
858    Ok(())
859}
860
861fn verify_page_size(
862    compressed_size: i32,
863    uncompressed_size: i32,
864    remaining_bytes: u64,
865) -> Result<()> {
866    // The page's compressed size should not exceed the remaining bytes that are
867    // available to read. The page's uncompressed size is the expected size
868    // after decompression, which can never be negative.
869    if compressed_size < 0 || compressed_size as u64 > remaining_bytes || uncompressed_size < 0 {
870        return Err(eof_err!("Invalid page header"));
871    }
872    Ok(())
873}
874
875impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
876    fn get_next_page(&mut self) -> Result<Option<Page>> {
877        loop {
878            let page = match &mut self.state {
879                SerializedPageReaderState::Values {
880                    offset,
881                    remaining_bytes: remaining,
882                    next_page_header,
883                    page_index,
884                    require_dictionary,
885                } => {
886                    if *remaining == 0 {
887                        return Ok(None);
888                    }
889
890                    let mut read = self.reader.get_read(*offset)?;
891                    let header = if let Some(header) = next_page_header.take() {
892                        *header
893                    } else {
894                        let (header_len, header) = Self::read_page_header_len(
895                            &self.context,
896                            &mut read,
897                            *page_index,
898                            *require_dictionary,
899                        )?;
900                        verify_page_header_len(header_len, *remaining)?;
901                        *offset += header_len as u64;
902                        *remaining -= header_len as u64;
903                        header
904                    };
905                    verify_page_size(
906                        header.compressed_page_size,
907                        header.uncompressed_page_size,
908                        *remaining,
909                    )?;
910                    let data_len = header.compressed_page_size as usize;
911                    let data_start = *offset;
912                    *offset += data_len as u64;
913                    *remaining -= data_len as u64;
914
915                    if header.r#type == PageType::INDEX_PAGE {
916                        continue;
917                    }
918
919                    let buffer = self.reader.get_bytes(data_start, data_len)?;
920
921                    let buffer =
922                        self.context
923                            .decrypt_page_data(buffer, *page_index, *require_dictionary)?;
924
925                    let page = decode_page(
926                        header,
927                        buffer,
928                        self.physical_type,
929                        self.decompressor.as_mut(),
930                    )?;
931                    if page.is_data_page() {
932                        *page_index += 1;
933                    } else if page.is_dictionary_page() {
934                        *require_dictionary = false;
935                    }
936                    page
937                }
938                SerializedPageReaderState::Pages {
939                    page_locations,
940                    dictionary_page,
941                    page_index,
942                    ..
943                } => {
944                    let (front, is_dictionary_page) = match dictionary_page.take() {
945                        Some(front) => (front, true),
946                        None => match page_locations.pop_front() {
947                            Some(front) => (front, false),
948                            None => return Ok(None),
949                        },
950                    };
951
952                    let page_len = usize::try_from(front.compressed_page_size)?;
953                    let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
954
955                    let (offset, header) = Self::read_page_header_len_from_bytes(
956                        &self.context,
957                        buffer.as_ref(),
958                        *page_index,
959                        is_dictionary_page,
960                    )?;
961                    let bytes = buffer.slice(offset..);
962                    let bytes =
963                        self.context
964                            .decrypt_page_data(bytes, *page_index, is_dictionary_page)?;
965
966                    if !is_dictionary_page {
967                        *page_index += 1;
968                    }
969                    decode_page(
970                        header,
971                        bytes,
972                        self.physical_type,
973                        self.decompressor.as_mut(),
974                    )?
975                }
976            };
977
978            return Ok(Some(page));
979        }
980    }
981
982    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
983        match &mut self.state {
984            SerializedPageReaderState::Values {
985                offset,
986                remaining_bytes,
987                next_page_header,
988                page_index,
989                require_dictionary,
990            } => {
991                loop {
992                    if *remaining_bytes == 0 {
993                        return Ok(None);
994                    }
995                    return if let Some(header) = next_page_header.as_ref() {
996                        if let Ok(page_meta) = (&**header).try_into() {
997                            Ok(Some(page_meta))
998                        } else {
999                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
1000                            *next_page_header = None;
1001                            continue;
1002                        }
1003                    } else {
1004                        let mut read = self.reader.get_read(*offset)?;
1005                        let (header_len, header) = Self::read_page_header_len(
1006                            &self.context,
1007                            &mut read,
1008                            *page_index,
1009                            *require_dictionary,
1010                        )?;
1011                        verify_page_header_len(header_len, *remaining_bytes)?;
1012                        *offset += header_len as u64;
1013                        *remaining_bytes -= header_len as u64;
1014                        let page_meta = if let Ok(page_meta) = (&header).try_into() {
1015                            Ok(Some(page_meta))
1016                        } else {
1017                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
1018                            continue;
1019                        };
1020                        *next_page_header = Some(Box::new(header));
1021                        page_meta
1022                    };
1023                }
1024            }
1025            SerializedPageReaderState::Pages {
1026                page_locations,
1027                dictionary_page,
1028                total_rows,
1029                page_index: _,
1030            } => {
1031                if dictionary_page.is_some() {
1032                    Ok(Some(PageMetadata {
1033                        num_rows: None,
1034                        num_levels: None,
1035                        is_dict: true,
1036                    }))
1037                } else if let Some(page) = page_locations.front() {
1038                    let next_rows = page_locations
1039                        .get(1)
1040                        .map(|x| x.first_row_index as usize)
1041                        .unwrap_or(*total_rows);
1042
1043                    Ok(Some(PageMetadata {
1044                        num_rows: Some(next_rows - page.first_row_index as usize),
1045                        num_levels: None,
1046                        is_dict: false,
1047                    }))
1048                } else {
1049                    Ok(None)
1050                }
1051            }
1052        }
1053    }
1054
1055    fn skip_next_page(&mut self) -> Result<()> {
1056        match &mut self.state {
1057            SerializedPageReaderState::Values {
1058                offset,
1059                remaining_bytes,
1060                next_page_header,
1061                page_index,
1062                require_dictionary,
1063            } => {
1064                if let Some(buffered_header) = next_page_header.take() {
1065                    verify_page_size(
1066                        buffered_header.compressed_page_size,
1067                        buffered_header.uncompressed_page_size,
1068                        *remaining_bytes,
1069                    )?;
1070                    // The next page header has already been peeked, so just advance the offset
1071                    *offset += buffered_header.compressed_page_size as u64;
1072                    *remaining_bytes -= buffered_header.compressed_page_size as u64;
1073                } else {
1074                    let mut read = self.reader.get_read(*offset)?;
1075                    let (header_len, header) = Self::read_page_header_len(
1076                        &self.context,
1077                        &mut read,
1078                        *page_index,
1079                        *require_dictionary,
1080                    )?;
1081                    verify_page_header_len(header_len, *remaining_bytes)?;
1082                    verify_page_size(
1083                        header.compressed_page_size,
1084                        header.uncompressed_page_size,
1085                        *remaining_bytes,
1086                    )?;
1087                    let data_page_size = header.compressed_page_size as u64;
1088                    *offset += header_len as u64 + data_page_size;
1089                    *remaining_bytes -= header_len as u64 + data_page_size;
1090                }
1091                if *require_dictionary {
1092                    *require_dictionary = false;
1093                } else {
1094                    *page_index += 1;
1095                }
1096                Ok(())
1097            }
1098            SerializedPageReaderState::Pages {
1099                page_locations,
1100                dictionary_page,
1101                page_index,
1102                ..
1103            } => {
1104                if dictionary_page.is_some() {
1105                    // If a dictionary page exists, consume it by taking it (sets to None)
1106                    dictionary_page.take();
1107                } else {
1108                    // If no dictionary page exists, simply pop the data page from page_locations
1109                    if page_locations.pop_front().is_some() {
1110                        *page_index += 1;
1111                    }
1112                }
1113
1114                Ok(())
1115            }
1116        }
1117    }
1118
1119    fn at_record_boundary(&mut self) -> Result<bool> {
1120        match &mut self.state {
1121            SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
1122            SerializedPageReaderState::Pages { .. } => Ok(true),
1123        }
1124    }
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129    use std::collections::HashSet;
1130
1131    use bytes::Buf;
1132
1133    use crate::file::page_index::column_index::{
1134        ByteArrayColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex,
1135    };
1136    use crate::file::properties::{EnabledStatistics, WriterProperties};
1137
1138    use crate::basic::{self, BoundaryOrder, ColumnOrder, Encoding, SortOrder};
1139    use crate::column::reader::ColumnReader;
1140    use crate::data_type::private::ParquetValueType;
1141    use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
1142    use crate::file::metadata::thrift::DataPageHeaderV2;
1143    #[allow(deprecated)]
1144    use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
1145    use crate::file::writer::SerializedFileWriter;
1146    use crate::record::RowAccessor;
1147    use crate::schema::parser::parse_message_type;
1148    use crate::util::test_common::file_util::{get_test_file, get_test_path};
1149
1150    use super::*;
1151
1152    #[test]
1153    fn test_decode_page_invalid_offset() {
1154        let page_header = PageHeader {
1155            r#type: PageType::DATA_PAGE_V2,
1156            uncompressed_page_size: 10,
1157            compressed_page_size: 10,
1158            data_page_header: None,
1159            index_page_header: None,
1160            dictionary_page_header: None,
1161            crc: None,
1162            data_page_header_v2: Some(DataPageHeaderV2 {
1163                num_nulls: 0,
1164                num_rows: 0,
1165                num_values: 0,
1166                encoding: Encoding::PLAIN,
1167                definition_levels_byte_length: 11,
1168                repetition_levels_byte_length: 0,
1169                is_compressed: None,
1170                statistics: None,
1171            }),
1172        };
1173
1174        let buffer = Bytes::new();
1175        let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1176        assert!(
1177            err.to_string()
1178                .contains("DataPage v2 header contains implausible values")
1179        );
1180    }
1181
1182    #[test]
1183    fn test_decode_unsupported_page() {
1184        let mut page_header = PageHeader {
1185            r#type: PageType::INDEX_PAGE,
1186            uncompressed_page_size: 10,
1187            compressed_page_size: 10,
1188            data_page_header: None,
1189            index_page_header: None,
1190            dictionary_page_header: None,
1191            crc: None,
1192            data_page_header_v2: None,
1193        };
1194        let buffer = Bytes::new();
1195        let err = decode_page(page_header.clone(), buffer.clone(), Type::INT32, None).unwrap_err();
1196        assert_eq!(
1197            err.to_string(),
1198            "Parquet error: Page type INDEX_PAGE is not supported"
1199        );
1200
1201        page_header.data_page_header_v2 = Some(DataPageHeaderV2 {
1202            num_nulls: 0,
1203            num_rows: 0,
1204            num_values: 0,
1205            encoding: Encoding::PLAIN,
1206            definition_levels_byte_length: 11,
1207            repetition_levels_byte_length: 0,
1208            is_compressed: None,
1209            statistics: None,
1210        });
1211        let err = decode_page(page_header, buffer, Type::INT32, None).unwrap_err();
1212        assert!(
1213            err.to_string()
1214                .contains("DataPage v2 header contains implausible values")
1215        );
1216    }
1217
1218    #[test]
1219    fn test_cursor_and_file_has_the_same_behaviour() {
1220        let mut buf: Vec<u8> = Vec::new();
1221        get_test_file("alltypes_plain.parquet")
1222            .read_to_end(&mut buf)
1223            .unwrap();
1224        let cursor = Bytes::from(buf);
1225        let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
1226
1227        let test_file = get_test_file("alltypes_plain.parquet");
1228        let read_from_file = SerializedFileReader::new(test_file).unwrap();
1229
1230        let file_iter = read_from_file.get_row_iter(None).unwrap();
1231        let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
1232
1233        for (a, b) in file_iter.zip(cursor_iter) {
1234            assert_eq!(a.unwrap(), b.unwrap())
1235        }
1236    }
1237
1238    #[test]
1239    fn test_file_reader_try_from() {
1240        // Valid file path
1241        let test_file = get_test_file("alltypes_plain.parquet");
1242        let test_path_buf = get_test_path("alltypes_plain.parquet");
1243        let test_path = test_path_buf.as_path();
1244        let test_path_str = test_path.to_str().unwrap();
1245
1246        let reader = SerializedFileReader::try_from(test_file);
1247        assert!(reader.is_ok());
1248
1249        let reader = SerializedFileReader::try_from(test_path);
1250        assert!(reader.is_ok());
1251
1252        let reader = SerializedFileReader::try_from(test_path_str);
1253        assert!(reader.is_ok());
1254
1255        let reader = SerializedFileReader::try_from(test_path_str.to_string());
1256        assert!(reader.is_ok());
1257
1258        // Invalid file path
1259        let test_path = Path::new("invalid.parquet");
1260        let test_path_str = test_path.to_str().unwrap();
1261
1262        let reader = SerializedFileReader::try_from(test_path);
1263        assert!(reader.is_err());
1264
1265        let reader = SerializedFileReader::try_from(test_path_str);
1266        assert!(reader.is_err());
1267
1268        let reader = SerializedFileReader::try_from(test_path_str.to_string());
1269        assert!(reader.is_err());
1270    }
1271
1272    #[test]
1273    fn test_file_reader_into_iter() {
1274        let path = get_test_path("alltypes_plain.parquet");
1275        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1276        let iter = reader.into_iter();
1277        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1278
1279        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1280    }
1281
1282    #[test]
1283    fn test_file_reader_into_iter_project() {
1284        let path = get_test_path("alltypes_plain.parquet");
1285        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
1286        let schema = "message schema { OPTIONAL INT32 id; }";
1287        let proj = parse_message_type(schema).ok();
1288        let iter = reader.into_iter().project(proj).unwrap();
1289        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
1290
1291        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
1292    }
1293
1294    #[test]
1295    fn test_reuse_file_chunk() {
1296        // This test covers the case of maintaining the correct start position in a file
1297        // stream for each column reader after initializing and moving to the next one
1298        // (without necessarily reading the entire column).
1299        let test_file = get_test_file("alltypes_plain.parquet");
1300        let reader = SerializedFileReader::new(test_file).unwrap();
1301        let row_group = reader.get_row_group(0).unwrap();
1302
1303        let mut page_readers = Vec::new();
1304        for i in 0..row_group.num_columns() {
1305            page_readers.push(row_group.get_column_page_reader(i).unwrap());
1306        }
1307
1308        // Now buffer each col reader, we do not expect any failures like:
1309        // General("underlying Thrift error: end of file")
1310        for mut page_reader in page_readers {
1311            assert!(page_reader.get_next_page().is_ok());
1312        }
1313    }
1314
1315    #[test]
1316    fn test_file_reader() {
1317        let test_file = get_test_file("alltypes_plain.parquet");
1318        let reader_result = SerializedFileReader::new(test_file);
1319        assert!(reader_result.is_ok());
1320        let reader = reader_result.unwrap();
1321
1322        // Test contents in Parquet metadata
1323        let metadata = reader.metadata();
1324        assert_eq!(metadata.num_row_groups(), 1);
1325
1326        // Test contents in file metadata
1327        let file_metadata = metadata.file_metadata();
1328        assert!(file_metadata.created_by().is_some());
1329        assert_eq!(
1330            file_metadata.created_by().unwrap(),
1331            "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
1332        );
1333        assert!(file_metadata.key_value_metadata().is_none());
1334        assert_eq!(file_metadata.num_rows(), 8);
1335        assert_eq!(file_metadata.version(), 1);
1336        assert_eq!(file_metadata.column_orders(), None);
1337
1338        // Test contents in row group metadata
1339        let row_group_metadata = metadata.row_group(0);
1340        assert_eq!(row_group_metadata.num_columns(), 11);
1341        assert_eq!(row_group_metadata.num_rows(), 8);
1342        assert_eq!(row_group_metadata.total_byte_size(), 671);
1343        // Check each column order
1344        for i in 0..row_group_metadata.num_columns() {
1345            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1346        }
1347
1348        // Test row group reader
1349        let row_group_reader_result = reader.get_row_group(0);
1350        assert!(row_group_reader_result.is_ok());
1351        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1352        assert_eq!(
1353            row_group_reader.num_columns(),
1354            row_group_metadata.num_columns()
1355        );
1356        assert_eq!(
1357            row_group_reader.metadata().total_byte_size(),
1358            row_group_metadata.total_byte_size()
1359        );
1360
1361        // Test page readers
1362        // TODO: test for every column
1363        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1364        assert!(page_reader_0_result.is_ok());
1365        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1366        let mut page_count = 0;
1367        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1368            let is_expected_page = match page {
1369                Page::DictionaryPage {
1370                    buf,
1371                    num_values,
1372                    encoding,
1373                    is_sorted,
1374                } => {
1375                    assert_eq!(buf.len(), 32);
1376                    assert_eq!(num_values, 8);
1377                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1378                    assert!(!is_sorted);
1379                    true
1380                }
1381                Page::DataPage {
1382                    buf,
1383                    num_values,
1384                    encoding,
1385                    def_level_encoding,
1386                    rep_level_encoding,
1387                    statistics,
1388                } => {
1389                    assert_eq!(buf.len(), 11);
1390                    assert_eq!(num_values, 8);
1391                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1392                    assert_eq!(def_level_encoding, Encoding::RLE);
1393                    #[allow(deprecated)]
1394                    let expected_rep_level_encoding = Encoding::BIT_PACKED;
1395                    assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1396                    assert!(statistics.is_none());
1397                    true
1398                }
1399                _ => false,
1400            };
1401            assert!(is_expected_page);
1402            page_count += 1;
1403        }
1404        assert_eq!(page_count, 2);
1405    }
1406
1407    #[test]
1408    fn test_file_reader_datapage_v2() {
1409        let test_file = get_test_file("datapage_v2.snappy.parquet");
1410        let reader_result = SerializedFileReader::new(test_file);
1411        assert!(reader_result.is_ok());
1412        let reader = reader_result.unwrap();
1413
1414        // Test contents in Parquet metadata
1415        let metadata = reader.metadata();
1416        assert_eq!(metadata.num_row_groups(), 1);
1417
1418        // Test contents in file metadata
1419        let file_metadata = metadata.file_metadata();
1420        assert!(file_metadata.created_by().is_some());
1421        assert_eq!(
1422            file_metadata.created_by().unwrap(),
1423            "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1424        );
1425        assert!(file_metadata.key_value_metadata().is_some());
1426        assert_eq!(
1427            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1428            1
1429        );
1430
1431        assert_eq!(file_metadata.num_rows(), 5);
1432        assert_eq!(file_metadata.version(), 1);
1433        assert_eq!(file_metadata.column_orders(), None);
1434
1435        let row_group_metadata = metadata.row_group(0);
1436
1437        // Check each column order
1438        for i in 0..row_group_metadata.num_columns() {
1439            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1440        }
1441
1442        // Test row group reader
1443        let row_group_reader_result = reader.get_row_group(0);
1444        assert!(row_group_reader_result.is_ok());
1445        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1446        assert_eq!(
1447            row_group_reader.num_columns(),
1448            row_group_metadata.num_columns()
1449        );
1450        assert_eq!(
1451            row_group_reader.metadata().total_byte_size(),
1452            row_group_metadata.total_byte_size()
1453        );
1454
1455        // Test page readers
1456        // TODO: test for every column
1457        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1458        assert!(page_reader_0_result.is_ok());
1459        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1460        let mut page_count = 0;
1461        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1462            let is_expected_page = match page {
1463                Page::DictionaryPage {
1464                    buf,
1465                    num_values,
1466                    encoding,
1467                    is_sorted,
1468                } => {
1469                    assert_eq!(buf.len(), 7);
1470                    assert_eq!(num_values, 1);
1471                    assert_eq!(encoding, Encoding::PLAIN);
1472                    assert!(!is_sorted);
1473                    true
1474                }
1475                Page::DataPageV2 {
1476                    buf,
1477                    num_values,
1478                    encoding,
1479                    num_nulls,
1480                    num_rows,
1481                    def_levels_byte_len,
1482                    rep_levels_byte_len,
1483                    is_compressed,
1484                    statistics,
1485                } => {
1486                    assert_eq!(buf.len(), 4);
1487                    assert_eq!(num_values, 5);
1488                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1489                    assert_eq!(num_nulls, 1);
1490                    assert_eq!(num_rows, 5);
1491                    assert_eq!(def_levels_byte_len, 2);
1492                    assert_eq!(rep_levels_byte_len, 0);
1493                    assert!(is_compressed);
1494                    assert!(statistics.is_none()); // page stats are no longer read
1495                    true
1496                }
1497                _ => false,
1498            };
1499            assert!(is_expected_page);
1500            page_count += 1;
1501        }
1502        assert_eq!(page_count, 2);
1503    }
1504
1505    #[test]
1506    fn test_file_reader_empty_compressed_datapage_v2() {
1507        // this file has a compressed datapage that un-compresses to 0 bytes
1508        let test_file = get_test_file("page_v2_empty_compressed.parquet");
1509        let reader_result = SerializedFileReader::new(test_file);
1510        assert!(reader_result.is_ok());
1511        let reader = reader_result.unwrap();
1512
1513        // Test contents in Parquet metadata
1514        let metadata = reader.metadata();
1515        assert_eq!(metadata.num_row_groups(), 1);
1516
1517        // Test contents in file metadata
1518        let file_metadata = metadata.file_metadata();
1519        assert!(file_metadata.created_by().is_some());
1520        assert_eq!(
1521            file_metadata.created_by().unwrap(),
1522            "parquet-cpp-arrow version 14.0.2"
1523        );
1524        assert!(file_metadata.key_value_metadata().is_some());
1525        assert_eq!(
1526            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1527            1
1528        );
1529
1530        assert_eq!(file_metadata.num_rows(), 10);
1531        assert_eq!(file_metadata.version(), 2);
1532        let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1533        assert_eq!(
1534            file_metadata.column_orders(),
1535            Some(vec![expected_order].as_ref())
1536        );
1537
1538        let row_group_metadata = metadata.row_group(0);
1539
1540        // Check each column order
1541        for i in 0..row_group_metadata.num_columns() {
1542            assert_eq!(file_metadata.column_order(i), expected_order);
1543        }
1544
1545        // Test row group reader
1546        let row_group_reader_result = reader.get_row_group(0);
1547        assert!(row_group_reader_result.is_ok());
1548        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1549        assert_eq!(
1550            row_group_reader.num_columns(),
1551            row_group_metadata.num_columns()
1552        );
1553        assert_eq!(
1554            row_group_reader.metadata().total_byte_size(),
1555            row_group_metadata.total_byte_size()
1556        );
1557
1558        // Test page readers
1559        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1560        assert!(page_reader_0_result.is_ok());
1561        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1562        let mut page_count = 0;
1563        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1564            let is_expected_page = match page {
1565                Page::DictionaryPage {
1566                    buf,
1567                    num_values,
1568                    encoding,
1569                    is_sorted,
1570                } => {
1571                    assert_eq!(buf.len(), 0);
1572                    assert_eq!(num_values, 0);
1573                    assert_eq!(encoding, Encoding::PLAIN);
1574                    assert!(!is_sorted);
1575                    true
1576                }
1577                Page::DataPageV2 {
1578                    buf,
1579                    num_values,
1580                    encoding,
1581                    num_nulls,
1582                    num_rows,
1583                    def_levels_byte_len,
1584                    rep_levels_byte_len,
1585                    is_compressed,
1586                    statistics,
1587                } => {
1588                    assert_eq!(buf.len(), 3);
1589                    assert_eq!(num_values, 10);
1590                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1591                    assert_eq!(num_nulls, 10);
1592                    assert_eq!(num_rows, 10);
1593                    assert_eq!(def_levels_byte_len, 2);
1594                    assert_eq!(rep_levels_byte_len, 0);
1595                    assert!(is_compressed);
1596                    assert!(statistics.is_none()); // page stats are no longer read
1597                    true
1598                }
1599                _ => false,
1600            };
1601            assert!(is_expected_page);
1602            page_count += 1;
1603        }
1604        assert_eq!(page_count, 2);
1605    }
1606
1607    #[test]
1608    fn test_file_reader_empty_datapage_v2() {
1609        // this file has 0 bytes compressed datapage that un-compresses to 0 bytes
1610        let test_file = get_test_file("datapage_v2_empty_datapage.snappy.parquet");
1611        let reader_result = SerializedFileReader::new(test_file);
1612        assert!(reader_result.is_ok());
1613        let reader = reader_result.unwrap();
1614
1615        // Test contents in Parquet metadata
1616        let metadata = reader.metadata();
1617        assert_eq!(metadata.num_row_groups(), 1);
1618
1619        // Test contents in file metadata
1620        let file_metadata = metadata.file_metadata();
1621        assert!(file_metadata.created_by().is_some());
1622        assert_eq!(
1623            file_metadata.created_by().unwrap(),
1624            "parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)"
1625        );
1626        assert!(file_metadata.key_value_metadata().is_some());
1627        assert_eq!(
1628            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1629            2
1630        );
1631
1632        assert_eq!(file_metadata.num_rows(), 1);
1633        assert_eq!(file_metadata.version(), 1);
1634        let expected_order = ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED);
1635        assert_eq!(
1636            file_metadata.column_orders(),
1637            Some(vec![expected_order].as_ref())
1638        );
1639
1640        let row_group_metadata = metadata.row_group(0);
1641
1642        // Check each column order
1643        for i in 0..row_group_metadata.num_columns() {
1644            assert_eq!(file_metadata.column_order(i), expected_order);
1645        }
1646
1647        // Test row group reader
1648        let row_group_reader_result = reader.get_row_group(0);
1649        assert!(row_group_reader_result.is_ok());
1650        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1651        assert_eq!(
1652            row_group_reader.num_columns(),
1653            row_group_metadata.num_columns()
1654        );
1655        assert_eq!(
1656            row_group_reader.metadata().total_byte_size(),
1657            row_group_metadata.total_byte_size()
1658        );
1659
1660        // Test page readers
1661        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1662        assert!(page_reader_0_result.is_ok());
1663        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1664        let mut page_count = 0;
1665        while let Some(page) = page_reader_0.get_next_page().unwrap() {
1666            let is_expected_page = match page {
1667                Page::DataPageV2 {
1668                    buf,
1669                    num_values,
1670                    encoding,
1671                    num_nulls,
1672                    num_rows,
1673                    def_levels_byte_len,
1674                    rep_levels_byte_len,
1675                    is_compressed,
1676                    statistics,
1677                } => {
1678                    assert_eq!(buf.len(), 2);
1679                    assert_eq!(num_values, 1);
1680                    assert_eq!(encoding, Encoding::PLAIN);
1681                    assert_eq!(num_nulls, 1);
1682                    assert_eq!(num_rows, 1);
1683                    assert_eq!(def_levels_byte_len, 2);
1684                    assert_eq!(rep_levels_byte_len, 0);
1685                    assert!(is_compressed);
1686                    assert!(statistics.is_none());
1687                    true
1688                }
1689                _ => false,
1690            };
1691            assert!(is_expected_page);
1692            page_count += 1;
1693        }
1694        assert_eq!(page_count, 1);
1695    }
1696
1697    fn get_serialized_page_reader<R: ChunkReader>(
1698        file_reader: &SerializedFileReader<R>,
1699        row_group: usize,
1700        column: usize,
1701    ) -> Result<SerializedPageReader<R>> {
1702        let row_group = {
1703            let row_group_metadata = file_reader.metadata.row_group(row_group);
1704            let props = Arc::clone(&file_reader.props);
1705            let f = Arc::clone(&file_reader.chunk_reader);
1706            SerializedRowGroupReader::new(
1707                f,
1708                row_group_metadata,
1709                file_reader
1710                    .metadata
1711                    .offset_index()
1712                    .map(|x| x[row_group].as_slice()),
1713                props,
1714            )?
1715        };
1716
1717        let col = row_group.metadata.column(column);
1718
1719        let page_locations = row_group
1720            .offset_index
1721            .map(|x| x[column].page_locations.clone());
1722
1723        let props = Arc::clone(&row_group.props);
1724        SerializedPageReader::new_with_properties(
1725            Arc::clone(&row_group.chunk_reader),
1726            col,
1727            usize::try_from(row_group.metadata.num_rows())?,
1728            page_locations,
1729            props,
1730        )
1731    }
1732
1733    #[test]
1734    fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1735        let test_file = get_test_file("alltypes_plain.parquet");
1736        let reader = SerializedFileReader::new(test_file)?;
1737
1738        let mut offset_set = HashSet::new();
1739        let num_row_groups = reader.metadata.num_row_groups();
1740        for row_group in 0..num_row_groups {
1741            let num_columns = reader.metadata.row_group(row_group).num_columns();
1742            for column in 0..num_columns {
1743                let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1744
1745                while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1746                    match &page_reader.state {
1747                        SerializedPageReaderState::Pages {
1748                            page_locations,
1749                            dictionary_page,
1750                            ..
1751                        } => {
1752                            if let Some(page) = dictionary_page {
1753                                assert_eq!(page.offset as u64, page_offset);
1754                            } else if let Some(page) = page_locations.front() {
1755                                assert_eq!(page.offset as u64, page_offset);
1756                            } else {
1757                                unreachable!()
1758                            }
1759                        }
1760                        SerializedPageReaderState::Values {
1761                            offset,
1762                            next_page_header,
1763                            ..
1764                        } => {
1765                            assert!(next_page_header.is_some());
1766                            assert_eq!(*offset, page_offset);
1767                        }
1768                    }
1769                    let page = page_reader.get_next_page()?;
1770                    assert!(page.is_some());
1771                    let newly_inserted = offset_set.insert(page_offset);
1772                    assert!(newly_inserted);
1773                }
1774            }
1775        }
1776
1777        Ok(())
1778    }
1779
1780    #[test]
1781    fn test_page_iterator() {
1782        let file = get_test_file("alltypes_plain.parquet");
1783        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1784
1785        let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1786
1787        // read first page
1788        let page = page_iterator.next();
1789        assert!(page.is_some());
1790        assert!(page.unwrap().is_ok());
1791
1792        // reach end of file
1793        let page = page_iterator.next();
1794        assert!(page.is_none());
1795
1796        let row_group_indices = Box::new(0..1);
1797        let mut page_iterator =
1798            FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1799
1800        // read first page
1801        let page = page_iterator.next();
1802        assert!(page.is_some());
1803        assert!(page.unwrap().is_ok());
1804
1805        // reach end of file
1806        let page = page_iterator.next();
1807        assert!(page.is_none());
1808    }
1809
1810    #[test]
1811    fn test_file_reader_key_value_metadata() {
1812        let file = get_test_file("binary.parquet");
1813        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1814
1815        let metadata = file_reader
1816            .metadata
1817            .file_metadata()
1818            .key_value_metadata()
1819            .unwrap();
1820
1821        assert_eq!(metadata.len(), 3);
1822
1823        assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1824
1825        assert_eq!(metadata[1].key, "writer.model.name");
1826        assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1827
1828        assert_eq!(metadata[2].key, "parquet.proto.class");
1829        assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1830    }
1831
1832    #[test]
1833    fn test_file_reader_optional_metadata() {
1834        // file with optional metadata: bloom filters, encoding stats, column index and offset index.
1835        let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1836        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1837
1838        let row_group_metadata = file_reader.metadata.row_group(0);
1839        let col0_metadata = row_group_metadata.column(0);
1840
1841        // test optional bloom filter offset
1842        assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1843
1844        // test page encoding stats
1845        let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1846
1847        assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1848        assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1849        assert_eq!(page_encoding_stats.count, 1);
1850
1851        // test optional column index offset
1852        assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1853        assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1854
1855        // test optional offset index offset
1856        assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1857        assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1858    }
1859
1860    #[test]
1861    fn test_file_reader_with_no_filter() -> Result<()> {
1862        let test_file = get_test_file("alltypes_plain.parquet");
1863        let origin_reader = SerializedFileReader::new(test_file)?;
1864        // test initial number of row groups
1865        let metadata = origin_reader.metadata();
1866        assert_eq!(metadata.num_row_groups(), 1);
1867        Ok(())
1868    }
1869
1870    #[test]
1871    fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
1872        let test_file = get_test_file("alltypes_plain.parquet");
1873        let read_options = ReadOptionsBuilder::new()
1874            .with_predicate(Box::new(|_, _| false))
1875            .build();
1876        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1877        let metadata = reader.metadata();
1878        assert_eq!(metadata.num_row_groups(), 0);
1879        Ok(())
1880    }
1881
1882    #[test]
1883    fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
1884        let test_file = get_test_file("alltypes_plain.parquet");
1885        let origin_reader = SerializedFileReader::new(test_file)?;
1886        // test initial number of row groups
1887        let metadata = origin_reader.metadata();
1888        assert_eq!(metadata.num_row_groups(), 1);
1889        let mid = get_midpoint_offset(metadata.row_group(0));
1890
1891        let test_file = get_test_file("alltypes_plain.parquet");
1892        let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
1893        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1894        let metadata = reader.metadata();
1895        assert_eq!(metadata.num_row_groups(), 1);
1896
1897        let test_file = get_test_file("alltypes_plain.parquet");
1898        let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
1899        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1900        let metadata = reader.metadata();
1901        assert_eq!(metadata.num_row_groups(), 0);
1902        Ok(())
1903    }
1904
1905    #[test]
1906    fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
1907        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1908        let origin_reader = SerializedFileReader::new(test_file)?;
1909        let metadata = origin_reader.metadata();
1910        let mid = get_midpoint_offset(metadata.row_group(0));
1911
1912        // true, true predicate
1913        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1914        let read_options = ReadOptionsBuilder::new()
1915            .with_page_index()
1916            .with_predicate(Box::new(|_, _| true))
1917            .with_range(mid, mid + 1)
1918            .build();
1919        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1920        let metadata = reader.metadata();
1921        assert_eq!(metadata.num_row_groups(), 1);
1922        assert_eq!(metadata.column_index().unwrap().len(), 1);
1923        assert_eq!(metadata.offset_index().unwrap().len(), 1);
1924
1925        // true, false predicate
1926        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1927        let read_options = ReadOptionsBuilder::new()
1928            .with_page_index()
1929            .with_predicate(Box::new(|_, _| true))
1930            .with_range(0, mid)
1931            .build();
1932        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1933        let metadata = reader.metadata();
1934        assert_eq!(metadata.num_row_groups(), 0);
1935        assert!(metadata.column_index().is_none());
1936        assert!(metadata.offset_index().is_none());
1937
1938        // false, true predicate
1939        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1940        let read_options = ReadOptionsBuilder::new()
1941            .with_page_index()
1942            .with_predicate(Box::new(|_, _| false))
1943            .with_range(mid, mid + 1)
1944            .build();
1945        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1946        let metadata = reader.metadata();
1947        assert_eq!(metadata.num_row_groups(), 0);
1948        assert!(metadata.column_index().is_none());
1949        assert!(metadata.offset_index().is_none());
1950
1951        // false, false predicate
1952        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1953        let read_options = ReadOptionsBuilder::new()
1954            .with_page_index()
1955            .with_predicate(Box::new(|_, _| false))
1956            .with_range(0, mid)
1957            .build();
1958        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1959        let metadata = reader.metadata();
1960        assert_eq!(metadata.num_row_groups(), 0);
1961        assert!(metadata.column_index().is_none());
1962        assert!(metadata.offset_index().is_none());
1963        Ok(())
1964    }
1965
1966    #[test]
1967    fn test_file_reader_invalid_metadata() {
1968        let data = [
1969            255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
1970            80, 65, 82, 49,
1971        ];
1972        let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
1973        assert_eq!(
1974            ret.err().unwrap().to_string(),
1975            "Parquet error: Received empty union from remote ColumnOrder"
1976        );
1977    }
1978
1979    #[test]
1980    // Use java parquet-tools get below pageIndex info
1981    // !```
1982    // parquet-tools column-index ./data_index_bloom_encoding_stats.parquet
1983    // row group 0:
1984    // column index for column String:
1985    // Boundary order: ASCENDING
1986    // page-0  :
1987    // null count                 min                                  max
1988    // 0                          Hello                                today
1989    //
1990    // offset index for column String:
1991    // page-0   :
1992    // offset   compressed size       first row index
1993    // 4               152                     0
1994    ///```
1995    //
1996    fn test_page_index_reader() {
1997        let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
1998        let builder = ReadOptionsBuilder::new();
1999        //enable read page index
2000        let options = builder.with_page_index().build();
2001        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2002        let reader = reader_result.unwrap();
2003
2004        // Test contents in Parquet metadata
2005        let metadata = reader.metadata();
2006        assert_eq!(metadata.num_row_groups(), 1);
2007
2008        let column_index = metadata.column_index().unwrap();
2009
2010        // only one row group
2011        assert_eq!(column_index.len(), 1);
2012        let index = if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][0] {
2013            index
2014        } else {
2015            unreachable!()
2016        };
2017
2018        assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
2019
2020        //only one page group
2021        assert_eq!(index.num_pages(), 1);
2022
2023        let min = index.min_value(0).unwrap();
2024        let max = index.max_value(0).unwrap();
2025        assert_eq!(b"Hello", min.as_bytes());
2026        assert_eq!(b"today", max.as_bytes());
2027
2028        let offset_indexes = metadata.offset_index().unwrap();
2029        // only one row group
2030        assert_eq!(offset_indexes.len(), 1);
2031        let offset_index = &offset_indexes[0];
2032        let page_offset = &offset_index[0].page_locations()[0];
2033
2034        assert_eq!(4, page_offset.offset);
2035        assert_eq!(152, page_offset.compressed_page_size);
2036        assert_eq!(0, page_offset.first_row_index);
2037    }
2038
2039    #[test]
2040    #[allow(deprecated)]
2041    fn test_page_index_reader_out_of_order() {
2042        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2043        let options = ReadOptionsBuilder::new().with_page_index().build();
2044        let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
2045        let metadata = reader.metadata();
2046
2047        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2048        let columns = metadata.row_group(0).columns();
2049        let reversed: Vec<_> = columns.iter().cloned().rev().collect();
2050
2051        let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
2052        let mut b = read_columns_indexes(&test_file, &reversed)
2053            .unwrap()
2054            .unwrap();
2055        b.reverse();
2056        assert_eq!(a, b);
2057
2058        let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
2059        let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
2060        b.reverse();
2061        assert_eq!(a, b);
2062    }
2063
2064    #[test]
2065    fn test_page_index_reader_all_type() {
2066        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2067        let builder = ReadOptionsBuilder::new();
2068        //enable read page index
2069        let options = builder.with_page_index().build();
2070        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2071        let reader = reader_result.unwrap();
2072
2073        // Test contents in Parquet metadata
2074        let metadata = reader.metadata();
2075        assert_eq!(metadata.num_row_groups(), 1);
2076
2077        let column_index = metadata.column_index().unwrap();
2078        let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
2079
2080        // only one row group
2081        assert_eq!(column_index.len(), 1);
2082        let row_group_metadata = metadata.row_group(0);
2083
2084        //col0->id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 7299, num_nulls: 0]
2085        assert!(!&column_index[0][0].is_sorted());
2086        let boundary_order = &column_index[0][0].get_boundary_order();
2087        assert!(boundary_order.is_some());
2088        matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
2089        if let ColumnIndexMetaData::INT32(index) = &column_index[0][0] {
2090            check_native_page_index(
2091                index,
2092                325,
2093                get_row_group_min_max_bytes(row_group_metadata, 0),
2094                BoundaryOrder::UNORDERED,
2095            );
2096            assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
2097        } else {
2098            unreachable!()
2099        };
2100        //col1->bool_col:BOOLEAN UNCOMPRESSED DO:0 FPO:37329 SZ:3022/3022/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: false, max: true, num_nulls: 0]
2101        assert!(&column_index[0][1].is_sorted());
2102        if let ColumnIndexMetaData::BOOLEAN(index) = &column_index[0][1] {
2103            assert_eq!(index.num_pages(), 82);
2104            assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
2105        } else {
2106            unreachable!()
2107        };
2108        //col2->tinyint_col: INT32 UNCOMPRESSED DO:0 FPO:40351 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2109        assert!(&column_index[0][2].is_sorted());
2110        if let ColumnIndexMetaData::INT32(index) = &column_index[0][2] {
2111            check_native_page_index(
2112                index,
2113                325,
2114                get_row_group_min_max_bytes(row_group_metadata, 2),
2115                BoundaryOrder::ASCENDING,
2116            );
2117            assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
2118        } else {
2119            unreachable!()
2120        };
2121        //col4->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2122        assert!(&column_index[0][3].is_sorted());
2123        if let ColumnIndexMetaData::INT32(index) = &column_index[0][3] {
2124            check_native_page_index(
2125                index,
2126                325,
2127                get_row_group_min_max_bytes(row_group_metadata, 3),
2128                BoundaryOrder::ASCENDING,
2129            );
2130            assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
2131        } else {
2132            unreachable!()
2133        };
2134        //col5->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2135        assert!(&column_index[0][4].is_sorted());
2136        if let ColumnIndexMetaData::INT32(index) = &column_index[0][4] {
2137            check_native_page_index(
2138                index,
2139                325,
2140                get_row_group_min_max_bytes(row_group_metadata, 4),
2141                BoundaryOrder::ASCENDING,
2142            );
2143            assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
2144        } else {
2145            unreachable!()
2146        };
2147        //col6->bigint_col: INT64 UNCOMPRESSED DO:0 FPO:152326 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 90, num_nulls: 0]
2148        assert!(!&column_index[0][5].is_sorted());
2149        if let ColumnIndexMetaData::INT64(index) = &column_index[0][5] {
2150            check_native_page_index(
2151                index,
2152                528,
2153                get_row_group_min_max_bytes(row_group_metadata, 5),
2154                BoundaryOrder::UNORDERED,
2155            );
2156            assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
2157        } else {
2158            unreachable!()
2159        };
2160        //col7->float_col: FLOAT UNCOMPRESSED DO:0 FPO:223924 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 9.9, num_nulls: 0]
2161        assert!(&column_index[0][6].is_sorted());
2162        if let ColumnIndexMetaData::FLOAT(index) = &column_index[0][6] {
2163            check_native_page_index(
2164                index,
2165                325,
2166                get_row_group_min_max_bytes(row_group_metadata, 6),
2167                BoundaryOrder::ASCENDING,
2168            );
2169            assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
2170        } else {
2171            unreachable!()
2172        };
2173        //col8->double_col: DOUBLE UNCOMPRESSED DO:0 FPO:261249 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 90.89999999999999, num_nulls: 0]
2174        assert!(!&column_index[0][7].is_sorted());
2175        if let ColumnIndexMetaData::DOUBLE(index) = &column_index[0][7] {
2176            check_native_page_index(
2177                index,
2178                528,
2179                get_row_group_min_max_bytes(row_group_metadata, 7),
2180                BoundaryOrder::UNORDERED,
2181            );
2182            assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
2183        } else {
2184            unreachable!()
2185        };
2186        //col9->date_string_col: BINARY UNCOMPRESSED DO:0 FPO:332847 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 01/01/09, max: 12/31/10, num_nulls: 0]
2187        assert!(!&column_index[0][8].is_sorted());
2188        if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][8] {
2189            check_byte_array_page_index(
2190                index,
2191                974,
2192                get_row_group_min_max_bytes(row_group_metadata, 8),
2193                BoundaryOrder::UNORDERED,
2194            );
2195            assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
2196        } else {
2197            unreachable!()
2198        };
2199        //col10->string_col: BINARY UNCOMPRESSED DO:0 FPO:444795 SZ:45298/45298/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
2200        assert!(&column_index[0][9].is_sorted());
2201        if let ColumnIndexMetaData::BYTE_ARRAY(index) = &column_index[0][9] {
2202            check_byte_array_page_index(
2203                index,
2204                352,
2205                get_row_group_min_max_bytes(row_group_metadata, 9),
2206                BoundaryOrder::ASCENDING,
2207            );
2208            assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
2209        } else {
2210            unreachable!()
2211        };
2212        //col11->timestamp_col: INT96 UNCOMPRESSED DO:0 FPO:490093 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[num_nulls: 0, min/max not defined]
2213        //Notice: min_max values for each page for this col not exits.
2214        assert!(!&column_index[0][10].is_sorted());
2215        if let ColumnIndexMetaData::NONE = &column_index[0][10] {
2216            assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
2217        } else {
2218            unreachable!()
2219        };
2220        //col12->year: INT32 UNCOMPRESSED DO:0 FPO:602041 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2009, max: 2010, num_nulls: 0]
2221        assert!(&column_index[0][11].is_sorted());
2222        if let ColumnIndexMetaData::INT32(index) = &column_index[0][11] {
2223            check_native_page_index(
2224                index,
2225                325,
2226                get_row_group_min_max_bytes(row_group_metadata, 11),
2227                BoundaryOrder::ASCENDING,
2228            );
2229            assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
2230        } else {
2231            unreachable!()
2232        };
2233        //col13->month: INT32 UNCOMPRESSED DO:0 FPO:639366 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 12, num_nulls: 0]
2234        assert!(!&column_index[0][12].is_sorted());
2235        if let ColumnIndexMetaData::INT32(index) = &column_index[0][12] {
2236            check_native_page_index(
2237                index,
2238                325,
2239                get_row_group_min_max_bytes(row_group_metadata, 12),
2240                BoundaryOrder::UNORDERED,
2241            );
2242            assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
2243        } else {
2244            unreachable!()
2245        };
2246    }
2247
2248    fn check_native_page_index<T: ParquetValueType>(
2249        row_group_index: &PrimitiveColumnIndex<T>,
2250        page_size: usize,
2251        min_max: (&[u8], &[u8]),
2252        boundary_order: BoundaryOrder,
2253    ) {
2254        assert_eq!(row_group_index.num_pages() as usize, page_size);
2255        assert_eq!(row_group_index.boundary_order, boundary_order);
2256        assert!(row_group_index.min_values().iter().all(|x| {
2257            x >= &T::try_from_le_slice(min_max.0).unwrap()
2258                && x <= &T::try_from_le_slice(min_max.1).unwrap()
2259        }));
2260    }
2261
2262    fn check_byte_array_page_index(
2263        row_group_index: &ByteArrayColumnIndex,
2264        page_size: usize,
2265        min_max: (&[u8], &[u8]),
2266        boundary_order: BoundaryOrder,
2267    ) {
2268        assert_eq!(row_group_index.num_pages() as usize, page_size);
2269        assert_eq!(row_group_index.boundary_order, boundary_order);
2270        for i in 0..row_group_index.num_pages() as usize {
2271            let x = row_group_index.min_value(i).unwrap();
2272            assert!(x >= min_max.0 && x <= min_max.1);
2273        }
2274    }
2275
2276    fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
2277        let statistics = r.column(col_num).statistics().unwrap();
2278        (
2279            statistics.min_bytes_opt().unwrap_or_default(),
2280            statistics.max_bytes_opt().unwrap_or_default(),
2281        )
2282    }
2283
2284    #[test]
2285    fn test_skip_next_page_with_dictionary_page() {
2286        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2287        let builder = ReadOptionsBuilder::new();
2288        // enable read page index
2289        let options = builder.with_page_index().build();
2290        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2291        let reader = reader_result.unwrap();
2292
2293        let row_group_reader = reader.get_row_group(0).unwrap();
2294
2295        // use 'string_col', Boundary order: UNORDERED, total 352 data pages and 1 dictionary page.
2296        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2297
2298        let mut vec = vec![];
2299
2300        // Step 1: Peek and ensure dictionary page is correctly identified
2301        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2302        assert!(meta.is_dict);
2303
2304        // Step 2: Call skip_next_page to skip the dictionary page
2305        column_page_reader.skip_next_page().unwrap();
2306
2307        // Step 3: Read the next data page after skipping the dictionary page
2308        let page = column_page_reader.get_next_page().unwrap().unwrap();
2309        assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2310
2311        // Step 4: Continue reading remaining data pages and verify correctness
2312        for _i in 0..351 {
2313            // 352 total pages, 1 dictionary page is skipped
2314            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2315            assert!(!meta.is_dict); // Verify no dictionary page here
2316            vec.push(meta);
2317
2318            let page = column_page_reader.get_next_page().unwrap().unwrap();
2319            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2320        }
2321
2322        // Step 5: Check if all pages are read
2323        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2324        assert!(column_page_reader.get_next_page().unwrap().is_none());
2325
2326        // Step 6: Verify the number of data pages read (should be 351 data pages)
2327        assert_eq!(vec.len(), 351);
2328    }
2329
2330    #[test]
2331    fn test_skip_page_with_offset_index() {
2332        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2333        let builder = ReadOptionsBuilder::new();
2334        //enable read page index
2335        let options = builder.with_page_index().build();
2336        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2337        let reader = reader_result.unwrap();
2338
2339        let row_group_reader = reader.get_row_group(0).unwrap();
2340
2341        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
2342        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2343
2344        let mut vec = vec![];
2345
2346        for i in 0..325 {
2347            if i % 2 == 0 {
2348                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2349            } else {
2350                column_page_reader.skip_next_page().unwrap();
2351            }
2352        }
2353        //check read all pages.
2354        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2355        assert!(column_page_reader.get_next_page().unwrap().is_none());
2356
2357        assert_eq!(vec.len(), 163);
2358    }
2359
2360    #[test]
2361    fn test_skip_page_without_offset_index() {
2362        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
2363
2364        // use default SerializedFileReader without read offsetIndex
2365        let reader_result = SerializedFileReader::new(test_file);
2366        let reader = reader_result.unwrap();
2367
2368        let row_group_reader = reader.get_row_group(0).unwrap();
2369
2370        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
2371        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
2372
2373        let mut vec = vec![];
2374
2375        for i in 0..325 {
2376            if i % 2 == 0 {
2377                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
2378            } else {
2379                column_page_reader.peek_next_page().unwrap().unwrap();
2380                column_page_reader.skip_next_page().unwrap();
2381            }
2382        }
2383        //check read all pages.
2384        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2385        assert!(column_page_reader.get_next_page().unwrap().is_none());
2386
2387        assert_eq!(vec.len(), 163);
2388    }
2389
2390    #[test]
2391    fn test_peek_page_with_dictionary_page() {
2392        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2393        let builder = ReadOptionsBuilder::new();
2394        //enable read page index
2395        let options = builder.with_page_index().build();
2396        let reader_result = SerializedFileReader::new_with_options(test_file, options);
2397        let reader = reader_result.unwrap();
2398        let row_group_reader = reader.get_row_group(0).unwrap();
2399
2400        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
2401        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2402
2403        let mut vec = vec![];
2404
2405        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2406        assert!(meta.is_dict);
2407        let page = column_page_reader.get_next_page().unwrap().unwrap();
2408        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2409
2410        for i in 0..352 {
2411            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2412            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
2413            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
2414            if i != 351 {
2415                assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
2416            } else {
2417                // last page first row index is 7290, total row count is 7300
2418                // because first row start with zero, last page row count should be 10.
2419                assert_eq!(meta.num_rows, Some(10));
2420            }
2421            assert!(!meta.is_dict);
2422            vec.push(meta);
2423            let page = column_page_reader.get_next_page().unwrap().unwrap();
2424            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2425        }
2426
2427        //check read all pages.
2428        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2429        assert!(column_page_reader.get_next_page().unwrap().is_none());
2430
2431        assert_eq!(vec.len(), 352);
2432    }
2433
2434    #[test]
2435    fn test_peek_page_with_dictionary_page_without_offset_index() {
2436        let test_file = get_test_file("alltypes_tiny_pages.parquet");
2437
2438        let reader_result = SerializedFileReader::new(test_file);
2439        let reader = reader_result.unwrap();
2440        let row_group_reader = reader.get_row_group(0).unwrap();
2441
2442        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
2443        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
2444
2445        let mut vec = vec![];
2446
2447        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2448        assert!(meta.is_dict);
2449        let page = column_page_reader.get_next_page().unwrap().unwrap();
2450        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
2451
2452        for i in 0..352 {
2453            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
2454            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
2455            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
2456            if i != 351 {
2457                assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
2458            } else {
2459                // last page first row index is 7290, total row count is 7300
2460                // because first row start with zero, last page row count should be 10.
2461                assert_eq!(meta.num_levels, Some(10));
2462            }
2463            assert!(!meta.is_dict);
2464            vec.push(meta);
2465            let page = column_page_reader.get_next_page().unwrap().unwrap();
2466            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
2467        }
2468
2469        //check read all pages.
2470        assert!(column_page_reader.peek_next_page().unwrap().is_none());
2471        assert!(column_page_reader.get_next_page().unwrap().is_none());
2472
2473        assert_eq!(vec.len(), 352);
2474    }
2475
2476    #[test]
2477    fn test_fixed_length_index() {
2478        let message_type = "
2479        message test_schema {
2480          OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
2481        }
2482        ";
2483
2484        let schema = parse_message_type(message_type).unwrap();
2485        let mut out = Vec::with_capacity(1024);
2486        let mut writer =
2487            SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
2488
2489        let mut r = writer.next_row_group().unwrap();
2490        let mut c = r.next_column().unwrap().unwrap();
2491        c.typed::<FixedLenByteArrayType>()
2492            .write_batch(
2493                &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
2494                Some(&[1, 1, 0, 1]),
2495                None,
2496            )
2497            .unwrap();
2498        c.close().unwrap();
2499        r.close().unwrap();
2500        writer.close().unwrap();
2501
2502        let b = Bytes::from(out);
2503        let options = ReadOptionsBuilder::new().with_page_index().build();
2504        let reader = SerializedFileReader::new_with_options(b, options).unwrap();
2505        let index = reader.metadata().column_index().unwrap();
2506
2507        // 1 row group
2508        assert_eq!(index.len(), 1);
2509        let c = &index[0];
2510        // 1 column
2511        assert_eq!(c.len(), 1);
2512
2513        match &c[0] {
2514            ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(v) => {
2515                assert_eq!(v.num_pages(), 1);
2516                assert_eq!(v.null_count(0).unwrap(), 1);
2517                assert_eq!(v.min_value(0).unwrap(), &[0; 11]);
2518                assert_eq!(v.max_value(0).unwrap(), &[5; 11]);
2519            }
2520            _ => unreachable!(),
2521        }
2522    }
2523
2524    #[test]
2525    fn test_multi_gz() {
2526        let file = get_test_file("concatenated_gzip_members.parquet");
2527        let reader = SerializedFileReader::new(file).unwrap();
2528        let row_group_reader = reader.get_row_group(0).unwrap();
2529        match row_group_reader.get_column_reader(0).unwrap() {
2530            ColumnReader::Int64ColumnReader(mut reader) => {
2531                let mut buffer = Vec::with_capacity(1024);
2532                let mut def_levels = Vec::with_capacity(1024);
2533                let (num_records, num_values, num_levels) = reader
2534                    .read_records(1024, Some(&mut def_levels), None, &mut buffer)
2535                    .unwrap();
2536
2537                assert_eq!(num_records, 513);
2538                assert_eq!(num_values, 513);
2539                assert_eq!(num_levels, 513);
2540
2541                let expected: Vec<i64> = (1..514).collect();
2542                assert_eq!(&buffer, &expected);
2543            }
2544            _ => unreachable!(),
2545        }
2546    }
2547
2548    #[test]
2549    fn test_byte_stream_split_extended() {
2550        let path = format!(
2551            "{}/byte_stream_split_extended.gzip.parquet",
2552            arrow::util::test_util::parquet_test_data(),
2553        );
2554        let file = File::open(path).unwrap();
2555        let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
2556
2557        // Use full schema as projected schema
2558        let mut iter = reader
2559            .get_row_iter(None)
2560            .expect("Failed to create row iterator");
2561
2562        let mut start = 0;
2563        let end = reader.metadata().file_metadata().num_rows();
2564
2565        let check_row = |row: Result<Row, ParquetError>| {
2566            assert!(row.is_ok());
2567            let r = row.unwrap();
2568            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
2569            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
2570            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
2571            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
2572            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
2573            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
2574            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
2575        };
2576
2577        while start < end {
2578            match iter.next() {
2579                Some(row) => check_row(row),
2580                None => break,
2581            };
2582            start += 1;
2583        }
2584    }
2585
2586    #[test]
2587    fn test_filtered_rowgroup_metadata() {
2588        let message_type = "
2589            message test_schema {
2590                REQUIRED INT32 a;
2591            }
2592        ";
2593        let schema = Arc::new(parse_message_type(message_type).unwrap());
2594        let props = Arc::new(
2595            WriterProperties::builder()
2596                .set_statistics_enabled(EnabledStatistics::Page)
2597                .build(),
2598        );
2599        let mut file: File = tempfile::tempfile().unwrap();
2600        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2601        let data = [1, 2, 3, 4, 5];
2602
2603        // write 5 row groups
2604        for idx in 0..5 {
2605            let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2606            let mut row_group_writer = file_writer.next_row_group().unwrap();
2607            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2608                writer
2609                    .typed::<Int32Type>()
2610                    .write_batch(data_i.as_slice(), None, None)
2611                    .unwrap();
2612                writer.close().unwrap();
2613            }
2614            row_group_writer.close().unwrap();
2615            file_writer.flushed_row_groups();
2616        }
2617        let file_metadata = file_writer.close().unwrap();
2618
2619        assert_eq!(file_metadata.file_metadata().num_rows(), 25);
2620        assert_eq!(file_metadata.num_row_groups(), 5);
2621
2622        // read only the 3rd row group
2623        let read_options = ReadOptionsBuilder::new()
2624            .with_page_index()
2625            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2626            .build();
2627        let reader =
2628            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2629                .unwrap();
2630        let metadata = reader.metadata();
2631
2632        // check we got the expected row group
2633        assert_eq!(metadata.num_row_groups(), 1);
2634        assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2635
2636        // check we only got the relevant page indexes
2637        assert!(metadata.column_index().is_some());
2638        assert!(metadata.offset_index().is_some());
2639        assert_eq!(metadata.column_index().unwrap().len(), 1);
2640        assert_eq!(metadata.offset_index().unwrap().len(), 1);
2641        let col_idx = metadata.column_index().unwrap();
2642        let off_idx = metadata.offset_index().unwrap();
2643        let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2644        let pg_idx = &col_idx[0][0];
2645        let off_idx_i = &off_idx[0][0];
2646
2647        // test that we got the index matching the row group
2648        match pg_idx {
2649            ColumnIndexMetaData::INT32(int_idx) => {
2650                let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2651                let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2652                assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2653                assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2654            }
2655            _ => panic!("wrong stats type"),
2656        }
2657
2658        // check offset index matches too
2659        assert_eq!(
2660            off_idx_i.page_locations[0].offset,
2661            metadata.row_group(0).column(0).data_page_offset()
2662        );
2663
2664        // read non-contiguous row groups
2665        let read_options = ReadOptionsBuilder::new()
2666            .with_page_index()
2667            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2668            .build();
2669        let reader =
2670            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2671                .unwrap();
2672        let metadata = reader.metadata();
2673
2674        // check we got the expected row groups
2675        assert_eq!(metadata.num_row_groups(), 2);
2676        assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2677        assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2678
2679        // check we only got the relevant page indexes
2680        assert!(metadata.column_index().is_some());
2681        assert!(metadata.offset_index().is_some());
2682        assert_eq!(metadata.column_index().unwrap().len(), 2);
2683        assert_eq!(metadata.offset_index().unwrap().len(), 2);
2684        let col_idx = metadata.column_index().unwrap();
2685        let off_idx = metadata.offset_index().unwrap();
2686
2687        for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2688            let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2689            let pg_idx = &col_idx_i[0];
2690            let off_idx_i = &off_idx[i][0];
2691
2692            // test that we got the index matching the row group
2693            match pg_idx {
2694                ColumnIndexMetaData::INT32(int_idx) => {
2695                    let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2696                    let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2697                    assert_eq!(int_idx.min_value(0), Some(min).as_ref());
2698                    assert_eq!(int_idx.max_value(0), Some(max).as_ref());
2699                }
2700                _ => panic!("wrong stats type"),
2701            }
2702
2703            // check offset index matches too
2704            assert_eq!(
2705                off_idx_i.page_locations[0].offset,
2706                metadata.row_group(i).column(0).data_page_offset()
2707            );
2708        }
2709    }
2710
2711    #[test]
2712    fn test_reuse_schema() {
2713        let file = get_test_file("alltypes_plain.parquet");
2714        let file_reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
2715        let schema = file_reader.metadata().file_metadata().schema_descr_ptr();
2716        let expected = file_reader.metadata;
2717
2718        let options = ReadOptionsBuilder::new()
2719            .with_parquet_schema(schema)
2720            .build();
2721        let file_reader = SerializedFileReader::new_with_options(file, options).unwrap();
2722
2723        assert_eq!(expected.as_ref(), file_reader.metadata.as_ref());
2724        // Should have used the same schema instance
2725        assert!(Arc::ptr_eq(
2726            &expected.file_metadata().schema_descr_ptr(),
2727            &file_reader.metadata.file_metadata().schema_descr_ptr()
2728        ));
2729    }
2730
2731    #[test]
2732    fn test_read_unknown_logical_type() {
2733        let file = get_test_file("unknown-logical-type.parquet");
2734        let reader = SerializedFileReader::new(file).expect("Error opening file");
2735
2736        let schema = reader.metadata().file_metadata().schema_descr();
2737        assert_eq!(
2738            schema.column(0).logical_type_ref(),
2739            Some(&basic::LogicalType::String)
2740        );
2741        assert_eq!(
2742            schema.column(1).logical_type_ref(),
2743            Some(&basic::LogicalType::_Unknown { field_id: 2555 })
2744        );
2745        assert_eq!(schema.column(1).physical_type(), Type::BYTE_ARRAY);
2746
2747        let mut iter = reader
2748            .get_row_iter(None)
2749            .expect("Failed to create row iterator");
2750
2751        let mut num_rows = 0;
2752        while iter.next().is_some() {
2753            num_rows += 1;
2754        }
2755        assert_eq!(num_rows, reader.metadata().file_metadata().num_rows());
2756    }
2757}