parquet/file/
serialized_reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains implementations of the reader traits FileReader, RowGroupReader and PageReader
19//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM)
20
21use std::collections::VecDeque;
22use std::iter;
23use std::{fs::File, io::Read, path::Path, sync::Arc};
24
25use crate::basic::{Encoding, Type};
26use crate::bloom_filter::Sbbf;
27use crate::column::page::{Page, PageMetadata, PageReader};
28use crate::compression::{create_codec, Codec};
29use crate::errors::{ParquetError, Result};
30use crate::file::page_index::offset_index::OffsetIndexMetaData;
31use crate::file::{
32    metadata::*,
33    properties::{ReaderProperties, ReaderPropertiesPtr},
34    reader::*,
35    statistics,
36};
37use crate::format::{PageHeader, PageLocation, PageType};
38use crate::record::reader::RowIter;
39use crate::record::Row;
40use crate::schema::types::Type as SchemaType;
41use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
42use bytes::Bytes;
43use thrift::protocol::TCompactInputProtocol;
44
45impl TryFrom<File> for SerializedFileReader<File> {
46    type Error = ParquetError;
47
48    fn try_from(file: File) -> Result<Self> {
49        Self::new(file)
50    }
51}
52
53impl TryFrom<&Path> for SerializedFileReader<File> {
54    type Error = ParquetError;
55
56    fn try_from(path: &Path) -> Result<Self> {
57        let file = File::open(path)?;
58        Self::try_from(file)
59    }
60}
61
62impl TryFrom<String> for SerializedFileReader<File> {
63    type Error = ParquetError;
64
65    fn try_from(path: String) -> Result<Self> {
66        Self::try_from(Path::new(&path))
67    }
68}
69
70impl TryFrom<&str> for SerializedFileReader<File> {
71    type Error = ParquetError;
72
73    fn try_from(path: &str) -> Result<Self> {
74        Self::try_from(Path::new(&path))
75    }
76}
77
78/// Conversion into a [`RowIter`]
79/// using the full file schema over all row groups.
80impl IntoIterator for SerializedFileReader<File> {
81    type Item = Result<Row>;
82    type IntoIter = RowIter<'static>;
83
84    fn into_iter(self) -> Self::IntoIter {
85        RowIter::from_file_into(Box::new(self))
86    }
87}
88
89// ----------------------------------------------------------------------
90// Implementations of file & row group readers
91
92/// A serialized implementation for Parquet [`FileReader`].
93pub struct SerializedFileReader<R: ChunkReader> {
94    chunk_reader: Arc<R>,
95    metadata: Arc<ParquetMetaData>,
96    props: ReaderPropertiesPtr,
97}
98
99/// A predicate for filtering row groups, invoked with the metadata and index
100/// of each row group in the file. Only row groups for which the predicate
101/// evaluates to `true` will be scanned
102pub type ReadGroupPredicate = Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>;
103
104/// A builder for [`ReadOptions`].
105/// For the predicates that are added to the builder,
106/// they will be chained using 'AND' to filter the row groups.
107#[derive(Default)]
108pub struct ReadOptionsBuilder {
109    predicates: Vec<ReadGroupPredicate>,
110    enable_page_index: bool,
111    props: Option<ReaderProperties>,
112}
113
114impl ReadOptionsBuilder {
115    /// New builder
116    pub fn new() -> Self {
117        Self::default()
118    }
119
120    /// Add a predicate on row group metadata to the reading option,
121    /// Filter only row groups that match the predicate criteria
122    pub fn with_predicate(mut self, predicate: ReadGroupPredicate) -> Self {
123        self.predicates.push(predicate);
124        self
125    }
126
127    /// Add a range predicate on filtering row groups if their midpoints are within
128    /// the Closed-Open range `[start..end) {x | start <= x < end}`
129    pub fn with_range(mut self, start: i64, end: i64) -> Self {
130        assert!(start < end);
131        let predicate = move |rg: &RowGroupMetaData, _: usize| {
132            let mid = get_midpoint_offset(rg);
133            mid >= start && mid < end
134        };
135        self.predicates.push(Box::new(predicate));
136        self
137    }
138
139    /// Enable reading the page index structures described in
140    /// "[Column Index] Layout to Support Page Skipping"
141    ///
142    /// [Column Index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
143    pub fn with_page_index(mut self) -> Self {
144        self.enable_page_index = true;
145        self
146    }
147
148    /// Set the [`ReaderProperties`] configuration.
149    pub fn with_reader_properties(mut self, properties: ReaderProperties) -> Self {
150        self.props = Some(properties);
151        self
152    }
153
154    /// Seal the builder and return the read options
155    pub fn build(self) -> ReadOptions {
156        let props = self
157            .props
158            .unwrap_or_else(|| ReaderProperties::builder().build());
159        ReadOptions {
160            predicates: self.predicates,
161            enable_page_index: self.enable_page_index,
162            props,
163        }
164    }
165}
166
167/// A collection of options for reading a Parquet file.
168///
169/// Currently, only predicates on row group metadata are supported.
170/// All predicates will be chained using 'AND' to filter the row groups.
171pub struct ReadOptions {
172    predicates: Vec<ReadGroupPredicate>,
173    enable_page_index: bool,
174    props: ReaderProperties,
175}
176
177impl<R: 'static + ChunkReader> SerializedFileReader<R> {
178    /// Creates file reader from a Parquet file.
179    /// Returns error if Parquet file does not exist or is corrupt.
180    pub fn new(chunk_reader: R) -> Result<Self> {
181        let metadata = ParquetMetaDataReader::new().parse_and_finish(&chunk_reader)?;
182        let props = Arc::new(ReaderProperties::builder().build());
183        Ok(Self {
184            chunk_reader: Arc::new(chunk_reader),
185            metadata: Arc::new(metadata),
186            props,
187        })
188    }
189
190    /// Creates file reader from a Parquet file with read options.
191    /// Returns error if Parquet file does not exist or is corrupt.
192    pub fn new_with_options(chunk_reader: R, options: ReadOptions) -> Result<Self> {
193        let mut metadata_builder = ParquetMetaDataReader::new()
194            .parse_and_finish(&chunk_reader)?
195            .into_builder();
196        let mut predicates = options.predicates;
197
198        // Filter row groups based on the predicates
199        for (i, rg_meta) in metadata_builder.take_row_groups().into_iter().enumerate() {
200            let mut keep = true;
201            for predicate in &mut predicates {
202                if !predicate(&rg_meta, i) {
203                    keep = false;
204                    break;
205                }
206            }
207            if keep {
208                metadata_builder = metadata_builder.add_row_group(rg_meta);
209            }
210        }
211
212        let mut metadata = metadata_builder.build();
213
214        // If page indexes are desired, build them with the filtered set of row groups
215        if options.enable_page_index {
216            let mut reader =
217                ParquetMetaDataReader::new_with_metadata(metadata).with_page_indexes(true);
218            reader.read_page_indexes(&chunk_reader)?;
219            metadata = reader.finish()?;
220        }
221
222        Ok(Self {
223            chunk_reader: Arc::new(chunk_reader),
224            metadata: Arc::new(metadata),
225            props: Arc::new(options.props),
226        })
227    }
228}
229
230/// Get midpoint offset for a row group
231fn get_midpoint_offset(meta: &RowGroupMetaData) -> i64 {
232    let col = meta.column(0);
233    let mut offset = col.data_page_offset();
234    if let Some(dic_offset) = col.dictionary_page_offset() {
235        if offset > dic_offset {
236            offset = dic_offset
237        }
238    };
239    offset + meta.compressed_size() / 2
240}
241
242impl<R: 'static + ChunkReader> FileReader for SerializedFileReader<R> {
243    fn metadata(&self) -> &ParquetMetaData {
244        &self.metadata
245    }
246
247    fn num_row_groups(&self) -> usize {
248        self.metadata.num_row_groups()
249    }
250
251    fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>> {
252        let row_group_metadata = self.metadata.row_group(i);
253        // Row groups should be processed sequentially.
254        let props = Arc::clone(&self.props);
255        let f = Arc::clone(&self.chunk_reader);
256        Ok(Box::new(SerializedRowGroupReader::new(
257            f,
258            row_group_metadata,
259            self.metadata.offset_index().map(|x| x[i].as_slice()),
260            props,
261        )?))
262    }
263
264    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
265        RowIter::from_file(projection, self)
266    }
267}
268
269/// A serialized implementation for Parquet [`RowGroupReader`].
270pub struct SerializedRowGroupReader<'a, R: ChunkReader> {
271    chunk_reader: Arc<R>,
272    metadata: &'a RowGroupMetaData,
273    offset_index: Option<&'a [OffsetIndexMetaData]>,
274    props: ReaderPropertiesPtr,
275    bloom_filters: Vec<Option<Sbbf>>,
276}
277
278impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> {
279    /// Creates new row group reader from a file, row group metadata and custom config.
280    pub fn new(
281        chunk_reader: Arc<R>,
282        metadata: &'a RowGroupMetaData,
283        offset_index: Option<&'a [OffsetIndexMetaData]>,
284        props: ReaderPropertiesPtr,
285    ) -> Result<Self> {
286        let bloom_filters = if props.read_bloom_filter() {
287            metadata
288                .columns()
289                .iter()
290                .map(|col| Sbbf::read_from_column_chunk(col, chunk_reader.clone()))
291                .collect::<Result<Vec<_>>>()?
292        } else {
293            iter::repeat(None).take(metadata.columns().len()).collect()
294        };
295        Ok(Self {
296            chunk_reader,
297            metadata,
298            offset_index,
299            props,
300            bloom_filters,
301        })
302    }
303}
304
305impl<R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'_, R> {
306    fn metadata(&self) -> &RowGroupMetaData {
307        self.metadata
308    }
309
310    fn num_columns(&self) -> usize {
311        self.metadata.num_columns()
312    }
313
314    // TODO: fix PARQUET-816
315    fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
316        let col = self.metadata.column(i);
317
318        let page_locations = self.offset_index.map(|x| x[i].page_locations.clone());
319
320        let props = Arc::clone(&self.props);
321        Ok(Box::new(SerializedPageReader::new_with_properties(
322            Arc::clone(&self.chunk_reader),
323            col,
324            self.metadata.num_rows() as usize,
325            page_locations,
326            props,
327        )?))
328    }
329
330    /// get bloom filter for the `i`th column
331    fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> {
332        self.bloom_filters[i].as_ref()
333    }
334
335    fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
336        RowIter::from_row_group(projection, self)
337    }
338}
339
340/// Reads a [`PageHeader`] from the provided [`Read`]
341pub(crate) fn read_page_header<T: Read>(input: &mut T) -> Result<PageHeader> {
342    let mut prot = TCompactInputProtocol::new(input);
343    let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
344    Ok(page_header)
345}
346
347/// Reads a [`PageHeader`] from the provided [`Read`] returning the number of bytes read
348fn read_page_header_len<T: Read>(input: &mut T) -> Result<(usize, PageHeader)> {
349    /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
350    struct TrackedRead<R> {
351        inner: R,
352        bytes_read: usize,
353    }
354
355    impl<R: Read> Read for TrackedRead<R> {
356        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
357            let v = self.inner.read(buf)?;
358            self.bytes_read += v;
359            Ok(v)
360        }
361    }
362
363    let mut tracked = TrackedRead {
364        inner: input,
365        bytes_read: 0,
366    };
367    let header = read_page_header(&mut tracked)?;
368    Ok((tracked.bytes_read, header))
369}
370
371/// Decodes a [`Page`] from the provided `buffer`
372pub(crate) fn decode_page(
373    page_header: PageHeader,
374    buffer: Bytes,
375    physical_type: Type,
376    decompressor: Option<&mut Box<dyn Codec>>,
377) -> Result<Page> {
378    // Verify the 32-bit CRC checksum of the page
379    #[cfg(feature = "crc")]
380    if let Some(expected_crc) = page_header.crc {
381        let crc = crc32fast::hash(&buffer);
382        if crc != expected_crc as u32 {
383            return Err(general_err!("Page CRC checksum mismatch"));
384        }
385    }
386
387    // When processing data page v2, depending on enabled compression for the
388    // page, we should account for uncompressed data ('offset') of
389    // repetition and definition levels.
390    //
391    // We always use 0 offset for other pages other than v2, `true` flag means
392    // that compression will be applied if decompressor is defined
393    let mut offset: usize = 0;
394    let mut can_decompress = true;
395
396    if let Some(ref header_v2) = page_header.data_page_header_v2 {
397        offset = (header_v2.definition_levels_byte_length + header_v2.repetition_levels_byte_length)
398            as usize;
399        // When is_compressed flag is missing the page is considered compressed
400        can_decompress = header_v2.is_compressed.unwrap_or(true);
401    }
402
403    // TODO: page header could be huge because of statistics. We should set a
404    // maximum page header size and abort if that is exceeded.
405    let buffer = match decompressor {
406        Some(decompressor) if can_decompress => {
407            let uncompressed_size = page_header.uncompressed_page_size as usize;
408            let mut decompressed = Vec::with_capacity(uncompressed_size);
409            let compressed = &buffer.as_ref()[offset..];
410            decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
411            decompressor.decompress(
412                compressed,
413                &mut decompressed,
414                Some(uncompressed_size - offset),
415            )?;
416
417            if decompressed.len() != uncompressed_size {
418                return Err(general_err!(
419                    "Actual decompressed size doesn't match the expected one ({} vs {})",
420                    decompressed.len(),
421                    uncompressed_size
422                ));
423            }
424
425            Bytes::from(decompressed)
426        }
427        _ => buffer,
428    };
429
430    let result = match page_header.type_ {
431        PageType::DICTIONARY_PAGE => {
432            let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| {
433                ParquetError::General("Missing dictionary page header".to_string())
434            })?;
435            let is_sorted = dict_header.is_sorted.unwrap_or(false);
436            Page::DictionaryPage {
437                buf: buffer,
438                num_values: dict_header.num_values.try_into()?,
439                encoding: Encoding::try_from(dict_header.encoding)?,
440                is_sorted,
441            }
442        }
443        PageType::DATA_PAGE => {
444            let header = page_header
445                .data_page_header
446                .ok_or_else(|| ParquetError::General("Missing V1 data page header".to_string()))?;
447            Page::DataPage {
448                buf: buffer,
449                num_values: header.num_values.try_into()?,
450                encoding: Encoding::try_from(header.encoding)?,
451                def_level_encoding: Encoding::try_from(header.definition_level_encoding)?,
452                rep_level_encoding: Encoding::try_from(header.repetition_level_encoding)?,
453                statistics: statistics::from_thrift(physical_type, header.statistics)?,
454            }
455        }
456        PageType::DATA_PAGE_V2 => {
457            let header = page_header
458                .data_page_header_v2
459                .ok_or_else(|| ParquetError::General("Missing V2 data page header".to_string()))?;
460            let is_compressed = header.is_compressed.unwrap_or(true);
461            Page::DataPageV2 {
462                buf: buffer,
463                num_values: header.num_values.try_into()?,
464                encoding: Encoding::try_from(header.encoding)?,
465                num_nulls: header.num_nulls.try_into()?,
466                num_rows: header.num_rows.try_into()?,
467                def_levels_byte_len: header.definition_levels_byte_length.try_into()?,
468                rep_levels_byte_len: header.repetition_levels_byte_length.try_into()?,
469                is_compressed,
470                statistics: statistics::from_thrift(physical_type, header.statistics)?,
471            }
472        }
473        _ => {
474            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
475            unimplemented!("Page type {:?} is not supported", page_header.type_)
476        }
477    };
478
479    Ok(result)
480}
481
482enum SerializedPageReaderState {
483    Values {
484        /// The current byte offset in the reader
485        offset: usize,
486
487        /// The length of the chunk in bytes
488        remaining_bytes: usize,
489
490        // If the next page header has already been "peeked", we will cache it and it`s length here
491        next_page_header: Option<Box<PageHeader>>,
492    },
493    Pages {
494        /// Remaining page locations
495        page_locations: VecDeque<PageLocation>,
496        /// Remaining dictionary location if any
497        dictionary_page: Option<PageLocation>,
498        /// The total number of rows in this column chunk
499        total_rows: usize,
500    },
501}
502
503/// A serialized implementation for Parquet [`PageReader`].
504pub struct SerializedPageReader<R: ChunkReader> {
505    /// The chunk reader
506    reader: Arc<R>,
507
508    /// The compression codec for this column chunk. Only set for non-PLAIN codec.
509    decompressor: Option<Box<dyn Codec>>,
510
511    /// Column chunk type.
512    physical_type: Type,
513
514    state: SerializedPageReaderState,
515}
516
517impl<R: ChunkReader> SerializedPageReader<R> {
518    /// Creates a new serialized page reader from a chunk reader and metadata
519    pub fn new(
520        reader: Arc<R>,
521        meta: &ColumnChunkMetaData,
522        total_rows: usize,
523        page_locations: Option<Vec<PageLocation>>,
524    ) -> Result<Self> {
525        let props = Arc::new(ReaderProperties::builder().build());
526        SerializedPageReader::new_with_properties(reader, meta, total_rows, page_locations, props)
527    }
528
529    /// Creates a new serialized page with custom options.
530    pub fn new_with_properties(
531        reader: Arc<R>,
532        meta: &ColumnChunkMetaData,
533        total_rows: usize,
534        page_locations: Option<Vec<PageLocation>>,
535        props: ReaderPropertiesPtr,
536    ) -> Result<Self> {
537        let decompressor = create_codec(meta.compression(), props.codec_options())?;
538        let (start, len) = meta.byte_range();
539
540        let state = match page_locations {
541            Some(locations) => {
542                let dictionary_page = match locations.first() {
543                    Some(dict_offset) if dict_offset.offset as u64 != start => Some(PageLocation {
544                        offset: start as i64,
545                        compressed_page_size: (dict_offset.offset as u64 - start) as i32,
546                        first_row_index: 0,
547                    }),
548                    _ => None,
549                };
550
551                SerializedPageReaderState::Pages {
552                    page_locations: locations.into(),
553                    dictionary_page,
554                    total_rows,
555                }
556            }
557            None => SerializedPageReaderState::Values {
558                offset: start as usize,
559                remaining_bytes: len as usize,
560                next_page_header: None,
561            },
562        };
563
564        Ok(Self {
565            reader,
566            decompressor,
567            state,
568            physical_type: meta.column_type(),
569        })
570    }
571
572    /// Similar to `peek_next_page`, but returns the offset of the next page instead of the page metadata.
573    /// Unlike page metadata, an offset can uniquely identify a page.
574    ///
575    /// This is used when we need to read parquet with row-filter, and we don't want to decompress the page twice.
576    /// This function allows us to check if the next page is being cached or read previously.
577    #[cfg(test)]
578    fn peek_next_page_offset(&mut self) -> Result<Option<usize>> {
579        match &mut self.state {
580            SerializedPageReaderState::Values {
581                offset,
582                remaining_bytes,
583                next_page_header,
584            } => {
585                loop {
586                    if *remaining_bytes == 0 {
587                        return Ok(None);
588                    }
589                    return if let Some(header) = next_page_header.as_ref() {
590                        if let Ok(_page_meta) = PageMetadata::try_from(&**header) {
591                            Ok(Some(*offset))
592                        } else {
593                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
594                            *next_page_header = None;
595                            continue;
596                        }
597                    } else {
598                        let mut read = self.reader.get_read(*offset as u64)?;
599                        let (header_len, header) = read_page_header_len(&mut read)?;
600                        *offset += header_len;
601                        *remaining_bytes -= header_len;
602                        let page_meta = if let Ok(_page_meta) = PageMetadata::try_from(&header) {
603                            Ok(Some(*offset))
604                        } else {
605                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
606                            continue;
607                        };
608                        *next_page_header = Some(Box::new(header));
609                        page_meta
610                    };
611                }
612            }
613            SerializedPageReaderState::Pages {
614                page_locations,
615                dictionary_page,
616                ..
617            } => {
618                if let Some(page) = dictionary_page {
619                    Ok(Some(page.offset as usize))
620                } else if let Some(page) = page_locations.front() {
621                    Ok(Some(page.offset as usize))
622                } else {
623                    Ok(None)
624                }
625            }
626        }
627    }
628}
629
630impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
631    type Item = Result<Page>;
632
633    fn next(&mut self) -> Option<Self::Item> {
634        self.get_next_page().transpose()
635    }
636}
637
638fn verify_page_header_len(header_len: usize, remaining_bytes: usize) -> Result<()> {
639    if header_len > remaining_bytes {
640        return Err(eof_err!("Invalid page header"));
641    }
642    Ok(())
643}
644
645fn verify_page_size(
646    compressed_size: i32,
647    uncompressed_size: i32,
648    remaining_bytes: usize,
649) -> Result<()> {
650    // The page's compressed size should not exceed the remaining bytes that are
651    // available to read. The page's uncompressed size is the expected size
652    // after decompression, which can never be negative.
653    if compressed_size < 0 || compressed_size as usize > remaining_bytes || uncompressed_size < 0 {
654        return Err(eof_err!("Invalid page header"));
655    }
656    Ok(())
657}
658
659impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
660    fn get_next_page(&mut self) -> Result<Option<Page>> {
661        loop {
662            let page = match &mut self.state {
663                SerializedPageReaderState::Values {
664                    offset,
665                    remaining_bytes: remaining,
666                    next_page_header,
667                } => {
668                    if *remaining == 0 {
669                        return Ok(None);
670                    }
671
672                    let mut read = self.reader.get_read(*offset as u64)?;
673                    let header = if let Some(header) = next_page_header.take() {
674                        *header
675                    } else {
676                        let (header_len, header) = read_page_header_len(&mut read)?;
677                        verify_page_header_len(header_len, *remaining)?;
678                        *offset += header_len;
679                        *remaining -= header_len;
680                        header
681                    };
682                    verify_page_size(
683                        header.compressed_page_size,
684                        header.uncompressed_page_size,
685                        *remaining,
686                    )?;
687                    let data_len = header.compressed_page_size as usize;
688                    *offset += data_len;
689                    *remaining -= data_len;
690
691                    if header.type_ == PageType::INDEX_PAGE {
692                        continue;
693                    }
694
695                    let mut buffer = Vec::with_capacity(data_len);
696                    let read = read.take(data_len as u64).read_to_end(&mut buffer)?;
697
698                    if read != data_len {
699                        return Err(eof_err!(
700                            "Expected to read {} bytes of page, read only {}",
701                            data_len,
702                            read
703                        ));
704                    }
705
706                    decode_page(
707                        header,
708                        Bytes::from(buffer),
709                        self.physical_type,
710                        self.decompressor.as_mut(),
711                    )?
712                }
713                SerializedPageReaderState::Pages {
714                    page_locations,
715                    dictionary_page,
716                    ..
717                } => {
718                    let front = match dictionary_page
719                        .take()
720                        .or_else(|| page_locations.pop_front())
721                    {
722                        Some(front) => front,
723                        None => return Ok(None),
724                    };
725
726                    let page_len = front.compressed_page_size as usize;
727
728                    let buffer = self.reader.get_bytes(front.offset as u64, page_len)?;
729
730                    let mut prot = TCompactSliceInputProtocol::new(buffer.as_ref());
731                    let header = PageHeader::read_from_in_protocol(&mut prot)?;
732                    let offset = buffer.len() - prot.as_slice().len();
733
734                    let bytes = buffer.slice(offset..);
735                    decode_page(
736                        header,
737                        bytes,
738                        self.physical_type,
739                        self.decompressor.as_mut(),
740                    )?
741                }
742            };
743
744            return Ok(Some(page));
745        }
746    }
747
748    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
749        match &mut self.state {
750            SerializedPageReaderState::Values {
751                offset,
752                remaining_bytes,
753                next_page_header,
754            } => {
755                loop {
756                    if *remaining_bytes == 0 {
757                        return Ok(None);
758                    }
759                    return if let Some(header) = next_page_header.as_ref() {
760                        if let Ok(page_meta) = (&**header).try_into() {
761                            Ok(Some(page_meta))
762                        } else {
763                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
764                            *next_page_header = None;
765                            continue;
766                        }
767                    } else {
768                        let mut read = self.reader.get_read(*offset as u64)?;
769                        let (header_len, header) = read_page_header_len(&mut read)?;
770                        verify_page_header_len(header_len, *remaining_bytes)?;
771                        *offset += header_len;
772                        *remaining_bytes -= header_len;
773                        let page_meta = if let Ok(page_meta) = (&header).try_into() {
774                            Ok(Some(page_meta))
775                        } else {
776                            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
777                            continue;
778                        };
779                        *next_page_header = Some(Box::new(header));
780                        page_meta
781                    };
782                }
783            }
784            SerializedPageReaderState::Pages {
785                page_locations,
786                dictionary_page,
787                total_rows,
788            } => {
789                if dictionary_page.is_some() {
790                    Ok(Some(PageMetadata {
791                        num_rows: None,
792                        num_levels: None,
793                        is_dict: true,
794                    }))
795                } else if let Some(page) = page_locations.front() {
796                    let next_rows = page_locations
797                        .get(1)
798                        .map(|x| x.first_row_index as usize)
799                        .unwrap_or(*total_rows);
800
801                    Ok(Some(PageMetadata {
802                        num_rows: Some(next_rows - page.first_row_index as usize),
803                        num_levels: None,
804                        is_dict: false,
805                    }))
806                } else {
807                    Ok(None)
808                }
809            }
810        }
811    }
812
813    fn skip_next_page(&mut self) -> Result<()> {
814        match &mut self.state {
815            SerializedPageReaderState::Values {
816                offset,
817                remaining_bytes,
818                next_page_header,
819            } => {
820                if let Some(buffered_header) = next_page_header.take() {
821                    verify_page_size(
822                        buffered_header.compressed_page_size,
823                        buffered_header.uncompressed_page_size,
824                        *remaining_bytes,
825                    )?;
826                    // The next page header has already been peeked, so just advance the offset
827                    *offset += buffered_header.compressed_page_size as usize;
828                    *remaining_bytes -= buffered_header.compressed_page_size as usize;
829                } else {
830                    let mut read = self.reader.get_read(*offset as u64)?;
831                    let (header_len, header) = read_page_header_len(&mut read)?;
832                    verify_page_header_len(header_len, *remaining_bytes)?;
833                    verify_page_size(
834                        header.compressed_page_size,
835                        header.uncompressed_page_size,
836                        *remaining_bytes,
837                    )?;
838                    let data_page_size = header.compressed_page_size as usize;
839                    *offset += header_len + data_page_size;
840                    *remaining_bytes -= header_len + data_page_size;
841                }
842                Ok(())
843            }
844            SerializedPageReaderState::Pages { page_locations, .. } => {
845                page_locations.pop_front();
846
847                Ok(())
848            }
849        }
850    }
851
852    fn at_record_boundary(&mut self) -> Result<bool> {
853        match &mut self.state {
854            SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
855            SerializedPageReaderState::Pages { .. } => Ok(true),
856        }
857    }
858}
859
860#[cfg(test)]
861mod tests {
862    use std::collections::HashSet;
863
864    use bytes::Buf;
865
866    use crate::file::properties::{EnabledStatistics, WriterProperties};
867    use crate::format::BoundaryOrder;
868
869    use crate::basic::{self, ColumnOrder};
870    use crate::column::reader::ColumnReader;
871    use crate::data_type::private::ParquetValueType;
872    use crate::data_type::{AsBytes, FixedLenByteArrayType, Int32Type};
873    use crate::file::page_index::index::{Index, NativeIndex};
874    use crate::file::page_index::index_reader::{read_columns_indexes, read_offset_indexes};
875    use crate::file::writer::SerializedFileWriter;
876    use crate::record::RowAccessor;
877    use crate::schema::parser::parse_message_type;
878    use crate::util::test_common::file_util::{get_test_file, get_test_path};
879
880    use super::*;
881
882    #[test]
883    fn test_cursor_and_file_has_the_same_behaviour() {
884        let mut buf: Vec<u8> = Vec::new();
885        get_test_file("alltypes_plain.parquet")
886            .read_to_end(&mut buf)
887            .unwrap();
888        let cursor = Bytes::from(buf);
889        let read_from_cursor = SerializedFileReader::new(cursor).unwrap();
890
891        let test_file = get_test_file("alltypes_plain.parquet");
892        let read_from_file = SerializedFileReader::new(test_file).unwrap();
893
894        let file_iter = read_from_file.get_row_iter(None).unwrap();
895        let cursor_iter = read_from_cursor.get_row_iter(None).unwrap();
896
897        for (a, b) in file_iter.zip(cursor_iter) {
898            assert_eq!(a.unwrap(), b.unwrap())
899        }
900    }
901
902    #[test]
903    fn test_file_reader_try_from() {
904        // Valid file path
905        let test_file = get_test_file("alltypes_plain.parquet");
906        let test_path_buf = get_test_path("alltypes_plain.parquet");
907        let test_path = test_path_buf.as_path();
908        let test_path_str = test_path.to_str().unwrap();
909
910        let reader = SerializedFileReader::try_from(test_file);
911        assert!(reader.is_ok());
912
913        let reader = SerializedFileReader::try_from(test_path);
914        assert!(reader.is_ok());
915
916        let reader = SerializedFileReader::try_from(test_path_str);
917        assert!(reader.is_ok());
918
919        let reader = SerializedFileReader::try_from(test_path_str.to_string());
920        assert!(reader.is_ok());
921
922        // Invalid file path
923        let test_path = Path::new("invalid.parquet");
924        let test_path_str = test_path.to_str().unwrap();
925
926        let reader = SerializedFileReader::try_from(test_path);
927        assert!(reader.is_err());
928
929        let reader = SerializedFileReader::try_from(test_path_str);
930        assert!(reader.is_err());
931
932        let reader = SerializedFileReader::try_from(test_path_str.to_string());
933        assert!(reader.is_err());
934    }
935
936    #[test]
937    fn test_file_reader_into_iter() {
938        let path = get_test_path("alltypes_plain.parquet");
939        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
940        let iter = reader.into_iter();
941        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
942
943        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
944    }
945
946    #[test]
947    fn test_file_reader_into_iter_project() {
948        let path = get_test_path("alltypes_plain.parquet");
949        let reader = SerializedFileReader::try_from(path.as_path()).unwrap();
950        let schema = "message schema { OPTIONAL INT32 id; }";
951        let proj = parse_message_type(schema).ok();
952        let iter = reader.into_iter().project(proj).unwrap();
953        let values: Vec<_> = iter.flat_map(|x| x.unwrap().get_int(0)).collect();
954
955        assert_eq!(values, &[4, 5, 6, 7, 2, 3, 0, 1]);
956    }
957
958    #[test]
959    fn test_reuse_file_chunk() {
960        // This test covers the case of maintaining the correct start position in a file
961        // stream for each column reader after initializing and moving to the next one
962        // (without necessarily reading the entire column).
963        let test_file = get_test_file("alltypes_plain.parquet");
964        let reader = SerializedFileReader::new(test_file).unwrap();
965        let row_group = reader.get_row_group(0).unwrap();
966
967        let mut page_readers = Vec::new();
968        for i in 0..row_group.num_columns() {
969            page_readers.push(row_group.get_column_page_reader(i).unwrap());
970        }
971
972        // Now buffer each col reader, we do not expect any failures like:
973        // General("underlying Thrift error: end of file")
974        for mut page_reader in page_readers {
975            assert!(page_reader.get_next_page().is_ok());
976        }
977    }
978
979    #[test]
980    fn test_file_reader() {
981        let test_file = get_test_file("alltypes_plain.parquet");
982        let reader_result = SerializedFileReader::new(test_file);
983        assert!(reader_result.is_ok());
984        let reader = reader_result.unwrap();
985
986        // Test contents in Parquet metadata
987        let metadata = reader.metadata();
988        assert_eq!(metadata.num_row_groups(), 1);
989
990        // Test contents in file metadata
991        let file_metadata = metadata.file_metadata();
992        assert!(file_metadata.created_by().is_some());
993        assert_eq!(
994            file_metadata.created_by().unwrap(),
995            "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
996        );
997        assert!(file_metadata.key_value_metadata().is_none());
998        assert_eq!(file_metadata.num_rows(), 8);
999        assert_eq!(file_metadata.version(), 1);
1000        assert_eq!(file_metadata.column_orders(), None);
1001
1002        // Test contents in row group metadata
1003        let row_group_metadata = metadata.row_group(0);
1004        assert_eq!(row_group_metadata.num_columns(), 11);
1005        assert_eq!(row_group_metadata.num_rows(), 8);
1006        assert_eq!(row_group_metadata.total_byte_size(), 671);
1007        // Check each column order
1008        for i in 0..row_group_metadata.num_columns() {
1009            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1010        }
1011
1012        // Test row group reader
1013        let row_group_reader_result = reader.get_row_group(0);
1014        assert!(row_group_reader_result.is_ok());
1015        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1016        assert_eq!(
1017            row_group_reader.num_columns(),
1018            row_group_metadata.num_columns()
1019        );
1020        assert_eq!(
1021            row_group_reader.metadata().total_byte_size(),
1022            row_group_metadata.total_byte_size()
1023        );
1024
1025        // Test page readers
1026        // TODO: test for every column
1027        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1028        assert!(page_reader_0_result.is_ok());
1029        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1030        let mut page_count = 0;
1031        while let Ok(Some(page)) = page_reader_0.get_next_page() {
1032            let is_expected_page = match page {
1033                Page::DictionaryPage {
1034                    buf,
1035                    num_values,
1036                    encoding,
1037                    is_sorted,
1038                } => {
1039                    assert_eq!(buf.len(), 32);
1040                    assert_eq!(num_values, 8);
1041                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1042                    assert!(!is_sorted);
1043                    true
1044                }
1045                Page::DataPage {
1046                    buf,
1047                    num_values,
1048                    encoding,
1049                    def_level_encoding,
1050                    rep_level_encoding,
1051                    statistics,
1052                } => {
1053                    assert_eq!(buf.len(), 11);
1054                    assert_eq!(num_values, 8);
1055                    assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
1056                    assert_eq!(def_level_encoding, Encoding::RLE);
1057                    #[allow(deprecated)]
1058                    let expected_rep_level_encoding = Encoding::BIT_PACKED;
1059                    assert_eq!(rep_level_encoding, expected_rep_level_encoding);
1060                    assert!(statistics.is_none());
1061                    true
1062                }
1063                _ => false,
1064            };
1065            assert!(is_expected_page);
1066            page_count += 1;
1067        }
1068        assert_eq!(page_count, 2);
1069    }
1070
1071    #[test]
1072    fn test_file_reader_datapage_v2() {
1073        let test_file = get_test_file("datapage_v2.snappy.parquet");
1074        let reader_result = SerializedFileReader::new(test_file);
1075        assert!(reader_result.is_ok());
1076        let reader = reader_result.unwrap();
1077
1078        // Test contents in Parquet metadata
1079        let metadata = reader.metadata();
1080        assert_eq!(metadata.num_row_groups(), 1);
1081
1082        // Test contents in file metadata
1083        let file_metadata = metadata.file_metadata();
1084        assert!(file_metadata.created_by().is_some());
1085        assert_eq!(
1086            file_metadata.created_by().unwrap(),
1087            "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)"
1088        );
1089        assert!(file_metadata.key_value_metadata().is_some());
1090        assert_eq!(
1091            file_metadata.key_value_metadata().to_owned().unwrap().len(),
1092            1
1093        );
1094
1095        assert_eq!(file_metadata.num_rows(), 5);
1096        assert_eq!(file_metadata.version(), 1);
1097        assert_eq!(file_metadata.column_orders(), None);
1098
1099        let row_group_metadata = metadata.row_group(0);
1100
1101        // Check each column order
1102        for i in 0..row_group_metadata.num_columns() {
1103            assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED);
1104        }
1105
1106        // Test row group reader
1107        let row_group_reader_result = reader.get_row_group(0);
1108        assert!(row_group_reader_result.is_ok());
1109        let row_group_reader: Box<dyn RowGroupReader> = row_group_reader_result.unwrap();
1110        assert_eq!(
1111            row_group_reader.num_columns(),
1112            row_group_metadata.num_columns()
1113        );
1114        assert_eq!(
1115            row_group_reader.metadata().total_byte_size(),
1116            row_group_metadata.total_byte_size()
1117        );
1118
1119        // Test page readers
1120        // TODO: test for every column
1121        let page_reader_0_result = row_group_reader.get_column_page_reader(0);
1122        assert!(page_reader_0_result.is_ok());
1123        let mut page_reader_0: Box<dyn PageReader> = page_reader_0_result.unwrap();
1124        let mut page_count = 0;
1125        while let Ok(Some(page)) = page_reader_0.get_next_page() {
1126            let is_expected_page = match page {
1127                Page::DictionaryPage {
1128                    buf,
1129                    num_values,
1130                    encoding,
1131                    is_sorted,
1132                } => {
1133                    assert_eq!(buf.len(), 7);
1134                    assert_eq!(num_values, 1);
1135                    assert_eq!(encoding, Encoding::PLAIN);
1136                    assert!(!is_sorted);
1137                    true
1138                }
1139                Page::DataPageV2 {
1140                    buf,
1141                    num_values,
1142                    encoding,
1143                    num_nulls,
1144                    num_rows,
1145                    def_levels_byte_len,
1146                    rep_levels_byte_len,
1147                    is_compressed,
1148                    statistics,
1149                } => {
1150                    assert_eq!(buf.len(), 4);
1151                    assert_eq!(num_values, 5);
1152                    assert_eq!(encoding, Encoding::RLE_DICTIONARY);
1153                    assert_eq!(num_nulls, 1);
1154                    assert_eq!(num_rows, 5);
1155                    assert_eq!(def_levels_byte_len, 2);
1156                    assert_eq!(rep_levels_byte_len, 0);
1157                    assert!(is_compressed);
1158                    assert!(statistics.is_some());
1159                    true
1160                }
1161                _ => false,
1162            };
1163            assert!(is_expected_page);
1164            page_count += 1;
1165        }
1166        assert_eq!(page_count, 2);
1167    }
1168
1169    fn get_serialized_page_reader<R: ChunkReader>(
1170        file_reader: &SerializedFileReader<R>,
1171        row_group: usize,
1172        column: usize,
1173    ) -> Result<SerializedPageReader<R>> {
1174        let row_group = {
1175            let row_group_metadata = file_reader.metadata.row_group(row_group);
1176            let props = Arc::clone(&file_reader.props);
1177            let f = Arc::clone(&file_reader.chunk_reader);
1178            SerializedRowGroupReader::new(
1179                f,
1180                row_group_metadata,
1181                file_reader
1182                    .metadata
1183                    .offset_index()
1184                    .map(|x| x[row_group].as_slice()),
1185                props,
1186            )?
1187        };
1188
1189        let col = row_group.metadata.column(column);
1190
1191        let page_locations = row_group
1192            .offset_index
1193            .map(|x| x[column].page_locations.clone());
1194
1195        let props = Arc::clone(&row_group.props);
1196        SerializedPageReader::new_with_properties(
1197            Arc::clone(&row_group.chunk_reader),
1198            col,
1199            row_group.metadata.num_rows() as usize,
1200            page_locations,
1201            props,
1202        )
1203    }
1204
1205    #[test]
1206    fn test_peek_next_page_offset_matches_actual() -> Result<()> {
1207        let test_file = get_test_file("alltypes_plain.parquet");
1208        let reader = SerializedFileReader::new(test_file)?;
1209
1210        let mut offset_set = HashSet::new();
1211        let num_row_groups = reader.metadata.num_row_groups();
1212        for row_group in 0..num_row_groups {
1213            let num_columns = reader.metadata.row_group(row_group).num_columns();
1214            for column in 0..num_columns {
1215                let mut page_reader = get_serialized_page_reader(&reader, row_group, column)?;
1216
1217                while let Ok(Some(page_offset)) = page_reader.peek_next_page_offset() {
1218                    match &page_reader.state {
1219                        SerializedPageReaderState::Pages {
1220                            page_locations,
1221                            dictionary_page,
1222                            ..
1223                        } => {
1224                            if let Some(page) = dictionary_page {
1225                                assert_eq!(page.offset as usize, page_offset);
1226                            } else if let Some(page) = page_locations.front() {
1227                                assert_eq!(page.offset as usize, page_offset);
1228                            } else {
1229                                unreachable!()
1230                            }
1231                        }
1232                        SerializedPageReaderState::Values {
1233                            offset,
1234                            next_page_header,
1235                            ..
1236                        } => {
1237                            assert!(next_page_header.is_some());
1238                            assert_eq!(*offset, page_offset);
1239                        }
1240                    }
1241                    let page = page_reader.get_next_page()?;
1242                    assert!(page.is_some());
1243                    let newly_inserted = offset_set.insert(page_offset);
1244                    assert!(newly_inserted);
1245                }
1246            }
1247        }
1248
1249        Ok(())
1250    }
1251
1252    #[test]
1253    fn test_page_iterator() {
1254        let file = get_test_file("alltypes_plain.parquet");
1255        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1256
1257        let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap();
1258
1259        // read first page
1260        let page = page_iterator.next();
1261        assert!(page.is_some());
1262        assert!(page.unwrap().is_ok());
1263
1264        // reach end of file
1265        let page = page_iterator.next();
1266        assert!(page.is_none());
1267
1268        let row_group_indices = Box::new(0..1);
1269        let mut page_iterator =
1270            FilePageIterator::with_row_groups(0, row_group_indices, file_reader).unwrap();
1271
1272        // read first page
1273        let page = page_iterator.next();
1274        assert!(page.is_some());
1275        assert!(page.unwrap().is_ok());
1276
1277        // reach end of file
1278        let page = page_iterator.next();
1279        assert!(page.is_none());
1280    }
1281
1282    #[test]
1283    fn test_file_reader_key_value_metadata() {
1284        let file = get_test_file("binary.parquet");
1285        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1286
1287        let metadata = file_reader
1288            .metadata
1289            .file_metadata()
1290            .key_value_metadata()
1291            .unwrap();
1292
1293        assert_eq!(metadata.len(), 3);
1294
1295        assert_eq!(metadata[0].key, "parquet.proto.descriptor");
1296
1297        assert_eq!(metadata[1].key, "writer.model.name");
1298        assert_eq!(metadata[1].value, Some("protobuf".to_owned()));
1299
1300        assert_eq!(metadata[2].key, "parquet.proto.class");
1301        assert_eq!(metadata[2].value, Some("foo.baz.Foobaz$Event".to_owned()));
1302    }
1303
1304    #[test]
1305    fn test_file_reader_optional_metadata() {
1306        // file with optional metadata: bloom filters, encoding stats, column index and offset index.
1307        let file = get_test_file("data_index_bloom_encoding_stats.parquet");
1308        let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
1309
1310        let row_group_metadata = file_reader.metadata.row_group(0);
1311        let col0_metadata = row_group_metadata.column(0);
1312
1313        // test optional bloom filter offset
1314        assert_eq!(col0_metadata.bloom_filter_offset().unwrap(), 192);
1315
1316        // test page encoding stats
1317        let page_encoding_stats = &col0_metadata.page_encoding_stats().unwrap()[0];
1318
1319        assert_eq!(page_encoding_stats.page_type, basic::PageType::DATA_PAGE);
1320        assert_eq!(page_encoding_stats.encoding, Encoding::PLAIN);
1321        assert_eq!(page_encoding_stats.count, 1);
1322
1323        // test optional column index offset
1324        assert_eq!(col0_metadata.column_index_offset().unwrap(), 156);
1325        assert_eq!(col0_metadata.column_index_length().unwrap(), 25);
1326
1327        // test optional offset index offset
1328        assert_eq!(col0_metadata.offset_index_offset().unwrap(), 181);
1329        assert_eq!(col0_metadata.offset_index_length().unwrap(), 11);
1330    }
1331
1332    #[test]
1333    fn test_file_reader_with_no_filter() -> Result<()> {
1334        let test_file = get_test_file("alltypes_plain.parquet");
1335        let origin_reader = SerializedFileReader::new(test_file)?;
1336        // test initial number of row groups
1337        let metadata = origin_reader.metadata();
1338        assert_eq!(metadata.num_row_groups(), 1);
1339        Ok(())
1340    }
1341
1342    #[test]
1343    fn test_file_reader_filter_row_groups_with_predicate() -> Result<()> {
1344        let test_file = get_test_file("alltypes_plain.parquet");
1345        let read_options = ReadOptionsBuilder::new()
1346            .with_predicate(Box::new(|_, _| false))
1347            .build();
1348        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1349        let metadata = reader.metadata();
1350        assert_eq!(metadata.num_row_groups(), 0);
1351        Ok(())
1352    }
1353
1354    #[test]
1355    fn test_file_reader_filter_row_groups_with_range() -> Result<()> {
1356        let test_file = get_test_file("alltypes_plain.parquet");
1357        let origin_reader = SerializedFileReader::new(test_file)?;
1358        // test initial number of row groups
1359        let metadata = origin_reader.metadata();
1360        assert_eq!(metadata.num_row_groups(), 1);
1361        let mid = get_midpoint_offset(metadata.row_group(0));
1362
1363        let test_file = get_test_file("alltypes_plain.parquet");
1364        let read_options = ReadOptionsBuilder::new().with_range(0, mid + 1).build();
1365        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1366        let metadata = reader.metadata();
1367        assert_eq!(metadata.num_row_groups(), 1);
1368
1369        let test_file = get_test_file("alltypes_plain.parquet");
1370        let read_options = ReadOptionsBuilder::new().with_range(0, mid).build();
1371        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1372        let metadata = reader.metadata();
1373        assert_eq!(metadata.num_row_groups(), 0);
1374        Ok(())
1375    }
1376
1377    #[test]
1378    fn test_file_reader_filter_row_groups_and_range() -> Result<()> {
1379        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1380        let origin_reader = SerializedFileReader::new(test_file)?;
1381        let metadata = origin_reader.metadata();
1382        let mid = get_midpoint_offset(metadata.row_group(0));
1383
1384        // true, true predicate
1385        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1386        let read_options = ReadOptionsBuilder::new()
1387            .with_page_index()
1388            .with_predicate(Box::new(|_, _| true))
1389            .with_range(mid, mid + 1)
1390            .build();
1391        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1392        let metadata = reader.metadata();
1393        assert_eq!(metadata.num_row_groups(), 1);
1394        assert_eq!(metadata.column_index().unwrap().len(), 1);
1395        assert_eq!(metadata.offset_index().unwrap().len(), 1);
1396
1397        // true, false predicate
1398        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1399        let read_options = ReadOptionsBuilder::new()
1400            .with_page_index()
1401            .with_predicate(Box::new(|_, _| true))
1402            .with_range(0, mid)
1403            .build();
1404        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1405        let metadata = reader.metadata();
1406        assert_eq!(metadata.num_row_groups(), 0);
1407        assert!(metadata.column_index().is_none());
1408        assert!(metadata.offset_index().is_none());
1409
1410        // false, true predicate
1411        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1412        let read_options = ReadOptionsBuilder::new()
1413            .with_page_index()
1414            .with_predicate(Box::new(|_, _| false))
1415            .with_range(mid, mid + 1)
1416            .build();
1417        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1418        let metadata = reader.metadata();
1419        assert_eq!(metadata.num_row_groups(), 0);
1420        assert!(metadata.column_index().is_none());
1421        assert!(metadata.offset_index().is_none());
1422
1423        // false, false predicate
1424        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1425        let read_options = ReadOptionsBuilder::new()
1426            .with_page_index()
1427            .with_predicate(Box::new(|_, _| false))
1428            .with_range(0, mid)
1429            .build();
1430        let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
1431        let metadata = reader.metadata();
1432        assert_eq!(metadata.num_row_groups(), 0);
1433        assert!(metadata.column_index().is_none());
1434        assert!(metadata.offset_index().is_none());
1435        Ok(())
1436    }
1437
1438    #[test]
1439    fn test_file_reader_invalid_metadata() {
1440        let data = [
1441            255, 172, 1, 0, 50, 82, 65, 73, 1, 0, 0, 0, 169, 168, 168, 162, 87, 255, 16, 0, 0, 0,
1442            80, 65, 82, 49,
1443        ];
1444        let ret = SerializedFileReader::new(Bytes::copy_from_slice(&data));
1445        assert_eq!(
1446            ret.err().unwrap().to_string(),
1447            "Parquet error: Could not parse metadata: bad data"
1448        );
1449    }
1450
1451    #[test]
1452    // Use java parquet-tools get below pageIndex info
1453    // !```
1454    // parquet-tools column-index ./data_index_bloom_encoding_stats.parquet
1455    // row group 0:
1456    // column index for column String:
1457    // Boundary order: ASCENDING
1458    // page-0  :
1459    // null count                 min                                  max
1460    // 0                          Hello                                today
1461    //
1462    // offset index for column String:
1463    // page-0   :
1464    // offset   compressed size       first row index
1465    // 4               152                     0
1466    ///```
1467    //
1468    fn test_page_index_reader() {
1469        let test_file = get_test_file("data_index_bloom_encoding_stats.parquet");
1470        let builder = ReadOptionsBuilder::new();
1471        //enable read page index
1472        let options = builder.with_page_index().build();
1473        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1474        let reader = reader_result.unwrap();
1475
1476        // Test contents in Parquet metadata
1477        let metadata = reader.metadata();
1478        assert_eq!(metadata.num_row_groups(), 1);
1479
1480        let column_index = metadata.column_index().unwrap();
1481
1482        // only one row group
1483        assert_eq!(column_index.len(), 1);
1484        let index = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
1485            index
1486        } else {
1487            unreachable!()
1488        };
1489
1490        assert_eq!(index.boundary_order, BoundaryOrder::ASCENDING);
1491        let index_in_pages = &index.indexes;
1492
1493        //only one page group
1494        assert_eq!(index_in_pages.len(), 1);
1495
1496        let page0 = &index_in_pages[0];
1497        let min = page0.min.as_ref().unwrap();
1498        let max = page0.max.as_ref().unwrap();
1499        assert_eq!(b"Hello", min.as_bytes());
1500        assert_eq!(b"today", max.as_bytes());
1501
1502        let offset_indexes = metadata.offset_index().unwrap();
1503        // only one row group
1504        assert_eq!(offset_indexes.len(), 1);
1505        let offset_index = &offset_indexes[0];
1506        let page_offset = &offset_index[0].page_locations()[0];
1507
1508        assert_eq!(4, page_offset.offset);
1509        assert_eq!(152, page_offset.compressed_page_size);
1510        assert_eq!(0, page_offset.first_row_index);
1511    }
1512
1513    #[test]
1514    fn test_page_index_reader_out_of_order() {
1515        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1516        let options = ReadOptionsBuilder::new().with_page_index().build();
1517        let reader = SerializedFileReader::new_with_options(test_file, options).unwrap();
1518        let metadata = reader.metadata();
1519
1520        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1521        let columns = metadata.row_group(0).columns();
1522        let reversed: Vec<_> = columns.iter().cloned().rev().collect();
1523
1524        let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
1525        let mut b = read_columns_indexes(&test_file, &reversed)
1526            .unwrap()
1527            .unwrap();
1528        b.reverse();
1529        assert_eq!(a, b);
1530
1531        let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
1532        let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
1533        b.reverse();
1534        assert_eq!(a, b);
1535    }
1536
1537    #[test]
1538    fn test_page_index_reader_all_type() {
1539        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1540        let builder = ReadOptionsBuilder::new();
1541        //enable read page index
1542        let options = builder.with_page_index().build();
1543        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1544        let reader = reader_result.unwrap();
1545
1546        // Test contents in Parquet metadata
1547        let metadata = reader.metadata();
1548        assert_eq!(metadata.num_row_groups(), 1);
1549
1550        let column_index = metadata.column_index().unwrap();
1551        let row_group_offset_indexes = &metadata.offset_index().unwrap()[0];
1552
1553        // only one row group
1554        assert_eq!(column_index.len(), 1);
1555        let row_group_metadata = metadata.row_group(0);
1556
1557        //col0->id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 7299, num_nulls: 0]
1558        assert!(!&column_index[0][0].is_sorted());
1559        let boundary_order = &column_index[0][0].get_boundary_order();
1560        assert!(boundary_order.is_some());
1561        matches!(boundary_order.unwrap(), BoundaryOrder::UNORDERED);
1562        if let Index::INT32(index) = &column_index[0][0] {
1563            check_native_page_index(
1564                index,
1565                325,
1566                get_row_group_min_max_bytes(row_group_metadata, 0),
1567                BoundaryOrder::UNORDERED,
1568            );
1569            assert_eq!(row_group_offset_indexes[0].page_locations.len(), 325);
1570        } else {
1571            unreachable!()
1572        };
1573        //col1->bool_col:BOOLEAN UNCOMPRESSED DO:0 FPO:37329 SZ:3022/3022/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: false, max: true, num_nulls: 0]
1574        assert!(&column_index[0][1].is_sorted());
1575        if let Index::BOOLEAN(index) = &column_index[0][1] {
1576            assert_eq!(index.indexes.len(), 82);
1577            assert_eq!(row_group_offset_indexes[1].page_locations.len(), 82);
1578        } else {
1579            unreachable!()
1580        };
1581        //col2->tinyint_col: INT32 UNCOMPRESSED DO:0 FPO:40351 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
1582        assert!(&column_index[0][2].is_sorted());
1583        if let Index::INT32(index) = &column_index[0][2] {
1584            check_native_page_index(
1585                index,
1586                325,
1587                get_row_group_min_max_bytes(row_group_metadata, 2),
1588                BoundaryOrder::ASCENDING,
1589            );
1590            assert_eq!(row_group_offset_indexes[2].page_locations.len(), 325);
1591        } else {
1592            unreachable!()
1593        };
1594        //col4->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
1595        assert!(&column_index[0][3].is_sorted());
1596        if let Index::INT32(index) = &column_index[0][3] {
1597            check_native_page_index(
1598                index,
1599                325,
1600                get_row_group_min_max_bytes(row_group_metadata, 3),
1601                BoundaryOrder::ASCENDING,
1602            );
1603            assert_eq!(row_group_offset_indexes[3].page_locations.len(), 325);
1604        } else {
1605            unreachable!()
1606        };
1607        //col5->smallint_col: INT32 UNCOMPRESSED DO:0 FPO:77676 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
1608        assert!(&column_index[0][4].is_sorted());
1609        if let Index::INT32(index) = &column_index[0][4] {
1610            check_native_page_index(
1611                index,
1612                325,
1613                get_row_group_min_max_bytes(row_group_metadata, 4),
1614                BoundaryOrder::ASCENDING,
1615            );
1616            assert_eq!(row_group_offset_indexes[4].page_locations.len(), 325);
1617        } else {
1618            unreachable!()
1619        };
1620        //col6->bigint_col: INT64 UNCOMPRESSED DO:0 FPO:152326 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 90, num_nulls: 0]
1621        assert!(!&column_index[0][5].is_sorted());
1622        if let Index::INT64(index) = &column_index[0][5] {
1623            check_native_page_index(
1624                index,
1625                528,
1626                get_row_group_min_max_bytes(row_group_metadata, 5),
1627                BoundaryOrder::UNORDERED,
1628            );
1629            assert_eq!(row_group_offset_indexes[5].page_locations.len(), 528);
1630        } else {
1631            unreachable!()
1632        };
1633        //col7->float_col: FLOAT UNCOMPRESSED DO:0 FPO:223924 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 9.9, num_nulls: 0]
1634        assert!(&column_index[0][6].is_sorted());
1635        if let Index::FLOAT(index) = &column_index[0][6] {
1636            check_native_page_index(
1637                index,
1638                325,
1639                get_row_group_min_max_bytes(row_group_metadata, 6),
1640                BoundaryOrder::ASCENDING,
1641            );
1642            assert_eq!(row_group_offset_indexes[6].page_locations.len(), 325);
1643        } else {
1644            unreachable!()
1645        };
1646        //col8->double_col: DOUBLE UNCOMPRESSED DO:0 FPO:261249 SZ:71598/71598/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: -0.0, max: 90.89999999999999, num_nulls: 0]
1647        assert!(!&column_index[0][7].is_sorted());
1648        if let Index::DOUBLE(index) = &column_index[0][7] {
1649            check_native_page_index(
1650                index,
1651                528,
1652                get_row_group_min_max_bytes(row_group_metadata, 7),
1653                BoundaryOrder::UNORDERED,
1654            );
1655            assert_eq!(row_group_offset_indexes[7].page_locations.len(), 528);
1656        } else {
1657            unreachable!()
1658        };
1659        //col9->date_string_col: BINARY UNCOMPRESSED DO:0 FPO:332847 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 01/01/09, max: 12/31/10, num_nulls: 0]
1660        assert!(!&column_index[0][8].is_sorted());
1661        if let Index::BYTE_ARRAY(index) = &column_index[0][8] {
1662            check_native_page_index(
1663                index,
1664                974,
1665                get_row_group_min_max_bytes(row_group_metadata, 8),
1666                BoundaryOrder::UNORDERED,
1667            );
1668            assert_eq!(row_group_offset_indexes[8].page_locations.len(), 974);
1669        } else {
1670            unreachable!()
1671        };
1672        //col10->string_col: BINARY UNCOMPRESSED DO:0 FPO:444795 SZ:45298/45298/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 0, max: 9, num_nulls: 0]
1673        assert!(&column_index[0][9].is_sorted());
1674        if let Index::BYTE_ARRAY(index) = &column_index[0][9] {
1675            check_native_page_index(
1676                index,
1677                352,
1678                get_row_group_min_max_bytes(row_group_metadata, 9),
1679                BoundaryOrder::ASCENDING,
1680            );
1681            assert_eq!(row_group_offset_indexes[9].page_locations.len(), 352);
1682        } else {
1683            unreachable!()
1684        };
1685        //col11->timestamp_col: INT96 UNCOMPRESSED DO:0 FPO:490093 SZ:111948/111948/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[num_nulls: 0, min/max not defined]
1686        //Notice: min_max values for each page for this col not exits.
1687        assert!(!&column_index[0][10].is_sorted());
1688        if let Index::NONE = &column_index[0][10] {
1689            assert_eq!(row_group_offset_indexes[10].page_locations.len(), 974);
1690        } else {
1691            unreachable!()
1692        };
1693        //col12->year: INT32 UNCOMPRESSED DO:0 FPO:602041 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2009, max: 2010, num_nulls: 0]
1694        assert!(&column_index[0][11].is_sorted());
1695        if let Index::INT32(index) = &column_index[0][11] {
1696            check_native_page_index(
1697                index,
1698                325,
1699                get_row_group_min_max_bytes(row_group_metadata, 11),
1700                BoundaryOrder::ASCENDING,
1701            );
1702            assert_eq!(row_group_offset_indexes[11].page_locations.len(), 325);
1703        } else {
1704            unreachable!()
1705        };
1706        //col13->month: INT32 UNCOMPRESSED DO:0 FPO:639366 SZ:37325/37325/1.00 VC:7300 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 12, num_nulls: 0]
1707        assert!(!&column_index[0][12].is_sorted());
1708        if let Index::INT32(index) = &column_index[0][12] {
1709            check_native_page_index(
1710                index,
1711                325,
1712                get_row_group_min_max_bytes(row_group_metadata, 12),
1713                BoundaryOrder::UNORDERED,
1714            );
1715            assert_eq!(row_group_offset_indexes[12].page_locations.len(), 325);
1716        } else {
1717            unreachable!()
1718        };
1719    }
1720
1721    fn check_native_page_index<T: ParquetValueType>(
1722        row_group_index: &NativeIndex<T>,
1723        page_size: usize,
1724        min_max: (&[u8], &[u8]),
1725        boundary_order: BoundaryOrder,
1726    ) {
1727        assert_eq!(row_group_index.indexes.len(), page_size);
1728        assert_eq!(row_group_index.boundary_order, boundary_order);
1729        row_group_index.indexes.iter().all(|x| {
1730            x.min.as_ref().unwrap() >= &T::try_from_le_slice(min_max.0).unwrap()
1731                && x.max.as_ref().unwrap() <= &T::try_from_le_slice(min_max.1).unwrap()
1732        });
1733    }
1734
1735    fn get_row_group_min_max_bytes(r: &RowGroupMetaData, col_num: usize) -> (&[u8], &[u8]) {
1736        let statistics = r.column(col_num).statistics().unwrap();
1737        (
1738            statistics.min_bytes_opt().unwrap_or_default(),
1739            statistics.max_bytes_opt().unwrap_or_default(),
1740        )
1741    }
1742
1743    #[test]
1744    fn test_skip_page_with_offset_index() {
1745        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1746        let builder = ReadOptionsBuilder::new();
1747        //enable read page index
1748        let options = builder.with_page_index().build();
1749        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1750        let reader = reader_result.unwrap();
1751
1752        let row_group_reader = reader.get_row_group(0).unwrap();
1753
1754        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
1755        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
1756
1757        let mut vec = vec![];
1758
1759        for i in 0..325 {
1760            if i % 2 == 0 {
1761                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
1762            } else {
1763                column_page_reader.skip_next_page().unwrap();
1764            }
1765        }
1766        //check read all pages.
1767        assert!(column_page_reader.peek_next_page().unwrap().is_none());
1768        assert!(column_page_reader.get_next_page().unwrap().is_none());
1769
1770        assert_eq!(vec.len(), 163);
1771    }
1772
1773    #[test]
1774    fn test_skip_page_without_offset_index() {
1775        let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
1776
1777        // use default SerializedFileReader without read offsetIndex
1778        let reader_result = SerializedFileReader::new(test_file);
1779        let reader = reader_result.unwrap();
1780
1781        let row_group_reader = reader.get_row_group(0).unwrap();
1782
1783        //use 'int_col', Boundary order: ASCENDING, total 325 pages.
1784        let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();
1785
1786        let mut vec = vec![];
1787
1788        for i in 0..325 {
1789            if i % 2 == 0 {
1790                vec.push(column_page_reader.get_next_page().unwrap().unwrap());
1791            } else {
1792                column_page_reader.peek_next_page().unwrap().unwrap();
1793                column_page_reader.skip_next_page().unwrap();
1794            }
1795        }
1796        //check read all pages.
1797        assert!(column_page_reader.peek_next_page().unwrap().is_none());
1798        assert!(column_page_reader.get_next_page().unwrap().is_none());
1799
1800        assert_eq!(vec.len(), 163);
1801    }
1802
1803    #[test]
1804    fn test_peek_page_with_dictionary_page() {
1805        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1806        let builder = ReadOptionsBuilder::new();
1807        //enable read page index
1808        let options = builder.with_page_index().build();
1809        let reader_result = SerializedFileReader::new_with_options(test_file, options);
1810        let reader = reader_result.unwrap();
1811        let row_group_reader = reader.get_row_group(0).unwrap();
1812
1813        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
1814        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
1815
1816        let mut vec = vec![];
1817
1818        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
1819        assert!(meta.is_dict);
1820        let page = column_page_reader.get_next_page().unwrap().unwrap();
1821        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
1822
1823        for i in 0..352 {
1824            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
1825            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
1826            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
1827            if i != 351 {
1828                assert!((meta.num_rows == Some(21)) || (meta.num_rows == Some(20)));
1829            } else {
1830                // last page first row index is 7290, total row count is 7300
1831                // because first row start with zero, last page row count should be 10.
1832                assert_eq!(meta.num_rows, Some(10));
1833            }
1834            assert!(!meta.is_dict);
1835            vec.push(meta);
1836            let page = column_page_reader.get_next_page().unwrap().unwrap();
1837            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
1838        }
1839
1840        //check read all pages.
1841        assert!(column_page_reader.peek_next_page().unwrap().is_none());
1842        assert!(column_page_reader.get_next_page().unwrap().is_none());
1843
1844        assert_eq!(vec.len(), 352);
1845    }
1846
1847    #[test]
1848    fn test_peek_page_with_dictionary_page_without_offset_index() {
1849        let test_file = get_test_file("alltypes_tiny_pages.parquet");
1850
1851        let reader_result = SerializedFileReader::new(test_file);
1852        let reader = reader_result.unwrap();
1853        let row_group_reader = reader.get_row_group(0).unwrap();
1854
1855        //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
1856        let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();
1857
1858        let mut vec = vec![];
1859
1860        let meta = column_page_reader.peek_next_page().unwrap().unwrap();
1861        assert!(meta.is_dict);
1862        let page = column_page_reader.get_next_page().unwrap().unwrap();
1863        assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));
1864
1865        for i in 0..352 {
1866            let meta = column_page_reader.peek_next_page().unwrap().unwrap();
1867            // have checked with `parquet-tools column-index   -c string_col  ./alltypes_tiny_pages.parquet`
1868            // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
1869            if i != 351 {
1870                assert!((meta.num_levels == Some(21)) || (meta.num_levels == Some(20)));
1871            } else {
1872                // last page first row index is 7290, total row count is 7300
1873                // because first row start with zero, last page row count should be 10.
1874                assert_eq!(meta.num_levels, Some(10));
1875            }
1876            assert!(!meta.is_dict);
1877            vec.push(meta);
1878            let page = column_page_reader.get_next_page().unwrap().unwrap();
1879            assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
1880        }
1881
1882        //check read all pages.
1883        assert!(column_page_reader.peek_next_page().unwrap().is_none());
1884        assert!(column_page_reader.get_next_page().unwrap().is_none());
1885
1886        assert_eq!(vec.len(), 352);
1887    }
1888
1889    #[test]
1890    fn test_fixed_length_index() {
1891        let message_type = "
1892        message test_schema {
1893          OPTIONAL FIXED_LEN_BYTE_ARRAY (11) value (DECIMAL(25,2));
1894        }
1895        ";
1896
1897        let schema = parse_message_type(message_type).unwrap();
1898        let mut out = Vec::with_capacity(1024);
1899        let mut writer =
1900            SerializedFileWriter::new(&mut out, Arc::new(schema), Default::default()).unwrap();
1901
1902        let mut r = writer.next_row_group().unwrap();
1903        let mut c = r.next_column().unwrap().unwrap();
1904        c.typed::<FixedLenByteArrayType>()
1905            .write_batch(
1906                &[vec![0; 11].into(), vec![5; 11].into(), vec![3; 11].into()],
1907                Some(&[1, 1, 0, 1]),
1908                None,
1909            )
1910            .unwrap();
1911        c.close().unwrap();
1912        r.close().unwrap();
1913        writer.close().unwrap();
1914
1915        let b = Bytes::from(out);
1916        let options = ReadOptionsBuilder::new().with_page_index().build();
1917        let reader = SerializedFileReader::new_with_options(b, options).unwrap();
1918        let index = reader.metadata().column_index().unwrap();
1919
1920        // 1 row group
1921        assert_eq!(index.len(), 1);
1922        let c = &index[0];
1923        // 1 column
1924        assert_eq!(c.len(), 1);
1925
1926        match &c[0] {
1927            Index::FIXED_LEN_BYTE_ARRAY(v) => {
1928                assert_eq!(v.indexes.len(), 1);
1929                let page_idx = &v.indexes[0];
1930                assert_eq!(page_idx.null_count.unwrap(), 1);
1931                assert_eq!(page_idx.min.as_ref().unwrap().as_ref(), &[0; 11]);
1932                assert_eq!(page_idx.max.as_ref().unwrap().as_ref(), &[5; 11]);
1933            }
1934            _ => unreachable!(),
1935        }
1936    }
1937
1938    #[test]
1939    fn test_multi_gz() {
1940        let file = get_test_file("concatenated_gzip_members.parquet");
1941        let reader = SerializedFileReader::new(file).unwrap();
1942        let row_group_reader = reader.get_row_group(0).unwrap();
1943        match row_group_reader.get_column_reader(0).unwrap() {
1944            ColumnReader::Int64ColumnReader(mut reader) => {
1945                let mut buffer = Vec::with_capacity(1024);
1946                let mut def_levels = Vec::with_capacity(1024);
1947                let (num_records, num_values, num_levels) = reader
1948                    .read_records(1024, Some(&mut def_levels), None, &mut buffer)
1949                    .unwrap();
1950
1951                assert_eq!(num_records, 513);
1952                assert_eq!(num_values, 513);
1953                assert_eq!(num_levels, 513);
1954
1955                let expected: Vec<i64> = (1..514).collect();
1956                assert_eq!(&buffer, &expected);
1957            }
1958            _ => unreachable!(),
1959        }
1960    }
1961
1962    #[test]
1963    fn test_byte_stream_split_extended() {
1964        let path = format!(
1965            "{}/byte_stream_split_extended.gzip.parquet",
1966            arrow::util::test_util::parquet_test_data(),
1967        );
1968        let file = File::open(path).unwrap();
1969        let reader = Box::new(SerializedFileReader::new(file).expect("Failed to create reader"));
1970
1971        // Use full schema as projected schema
1972        let mut iter = reader
1973            .get_row_iter(None)
1974            .expect("Failed to create row iterator");
1975
1976        let mut start = 0;
1977        let end = reader.metadata().file_metadata().num_rows();
1978
1979        let check_row = |row: Result<Row, ParquetError>| {
1980            assert!(row.is_ok());
1981            let r = row.unwrap();
1982            assert_eq!(r.get_float16(0).unwrap(), r.get_float16(1).unwrap());
1983            assert_eq!(r.get_float(2).unwrap(), r.get_float(3).unwrap());
1984            assert_eq!(r.get_double(4).unwrap(), r.get_double(5).unwrap());
1985            assert_eq!(r.get_int(6).unwrap(), r.get_int(7).unwrap());
1986            assert_eq!(r.get_long(8).unwrap(), r.get_long(9).unwrap());
1987            assert_eq!(r.get_bytes(10).unwrap(), r.get_bytes(11).unwrap());
1988            assert_eq!(r.get_decimal(12).unwrap(), r.get_decimal(13).unwrap());
1989        };
1990
1991        while start < end {
1992            match iter.next() {
1993                Some(row) => check_row(row),
1994                None => break,
1995            };
1996            start += 1;
1997        }
1998    }
1999
2000    #[test]
2001    fn test_filtered_rowgroup_metadata() {
2002        let message_type = "
2003            message test_schema {
2004                REQUIRED INT32 a;
2005            }
2006        ";
2007        let schema = Arc::new(parse_message_type(message_type).unwrap());
2008        let props = Arc::new(
2009            WriterProperties::builder()
2010                .set_statistics_enabled(EnabledStatistics::Page)
2011                .build(),
2012        );
2013        let mut file: File = tempfile::tempfile().unwrap();
2014        let mut file_writer = SerializedFileWriter::new(&mut file, schema, props).unwrap();
2015        let data = [1, 2, 3, 4, 5];
2016
2017        // write 5 row groups
2018        for idx in 0..5 {
2019            let data_i: Vec<i32> = data.iter().map(|x| x * (idx + 1)).collect();
2020            let mut row_group_writer = file_writer.next_row_group().unwrap();
2021            if let Some(mut writer) = row_group_writer.next_column().unwrap() {
2022                writer
2023                    .typed::<Int32Type>()
2024                    .write_batch(data_i.as_slice(), None, None)
2025                    .unwrap();
2026                writer.close().unwrap();
2027            }
2028            row_group_writer.close().unwrap();
2029            file_writer.flushed_row_groups();
2030        }
2031        let file_metadata = file_writer.close().unwrap();
2032
2033        assert_eq!(file_metadata.num_rows, 25);
2034        assert_eq!(file_metadata.row_groups.len(), 5);
2035
2036        // read only the 3rd row group
2037        let read_options = ReadOptionsBuilder::new()
2038            .with_page_index()
2039            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) == 2))
2040            .build();
2041        let reader =
2042            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2043                .unwrap();
2044        let metadata = reader.metadata();
2045
2046        // check we got the expected row group
2047        assert_eq!(metadata.num_row_groups(), 1);
2048        assert_eq!(metadata.row_group(0).ordinal(), Some(2));
2049
2050        // check we only got the relevant page indexes
2051        assert!(metadata.column_index().is_some());
2052        assert!(metadata.offset_index().is_some());
2053        assert_eq!(metadata.column_index().unwrap().len(), 1);
2054        assert_eq!(metadata.offset_index().unwrap().len(), 1);
2055        let col_idx = metadata.column_index().unwrap();
2056        let off_idx = metadata.offset_index().unwrap();
2057        let col_stats = metadata.row_group(0).column(0).statistics().unwrap();
2058        let pg_idx = &col_idx[0][0];
2059        let off_idx_i = &off_idx[0][0];
2060
2061        // test that we got the index matching the row group
2062        match pg_idx {
2063            Index::INT32(int_idx) => {
2064                let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2065                let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2066                assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2067                assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2068            }
2069            _ => panic!("wrong stats type"),
2070        }
2071
2072        // check offset index matches too
2073        assert_eq!(
2074            off_idx_i.page_locations[0].offset,
2075            metadata.row_group(0).column(0).data_page_offset()
2076        );
2077
2078        // read non-contiguous row groups
2079        let read_options = ReadOptionsBuilder::new()
2080            .with_page_index()
2081            .with_predicate(Box::new(|rgmeta, _| rgmeta.ordinal().unwrap_or(0) % 2 == 1))
2082            .build();
2083        let reader =
2084            SerializedFileReader::new_with_options(file.try_clone().unwrap(), read_options)
2085                .unwrap();
2086        let metadata = reader.metadata();
2087
2088        // check we got the expected row groups
2089        assert_eq!(metadata.num_row_groups(), 2);
2090        assert_eq!(metadata.row_group(0).ordinal(), Some(1));
2091        assert_eq!(metadata.row_group(1).ordinal(), Some(3));
2092
2093        // check we only got the relevant page indexes
2094        assert!(metadata.column_index().is_some());
2095        assert!(metadata.offset_index().is_some());
2096        assert_eq!(metadata.column_index().unwrap().len(), 2);
2097        assert_eq!(metadata.offset_index().unwrap().len(), 2);
2098        let col_idx = metadata.column_index().unwrap();
2099        let off_idx = metadata.offset_index().unwrap();
2100
2101        for (i, col_idx_i) in col_idx.iter().enumerate().take(metadata.num_row_groups()) {
2102            let col_stats = metadata.row_group(i).column(0).statistics().unwrap();
2103            let pg_idx = &col_idx_i[0];
2104            let off_idx_i = &off_idx[i][0];
2105
2106            // test that we got the index matching the row group
2107            match pg_idx {
2108                Index::INT32(int_idx) => {
2109                    let min = col_stats.min_bytes_opt().unwrap().get_i32_le();
2110                    let max = col_stats.max_bytes_opt().unwrap().get_i32_le();
2111                    assert_eq!(int_idx.indexes[0].min(), Some(min).as_ref());
2112                    assert_eq!(int_idx.indexes[0].max(), Some(max).as_ref());
2113                }
2114                _ => panic!("wrong stats type"),
2115            }
2116
2117            // check offset index matches too
2118            assert_eq!(
2119                off_idx_i.page_locations[0].offset,
2120                metadata.row_group(i).column(0).data_page_offset()
2121            );
2122        }
2123    }
2124}