parquet/arrow/array_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic for reading into arrow arrays: [`ArrayReader`] and [`RowGroups`]
19
20use crate::errors::Result;
21use arrow_array::ArrayRef;
22use arrow_schema::DataType as ArrowType;
23use std::any::Any;
24use std::sync::Arc;
25
26use crate::arrow::record_reader::buffer::ValuesBuffer;
27use crate::arrow::record_reader::GenericRecordReader;
28use crate::column::page::PageIterator;
29use crate::column::reader::decoder::ColumnValueDecoder;
30use crate::file::reader::{FilePageIterator, FileReader};
31
32mod builder;
33mod byte_array;
34mod byte_array_dictionary;
35mod byte_view_array;
36mod cached_array_reader;
37mod empty_array;
38mod fixed_len_byte_array;
39mod fixed_size_list_array;
40mod list_array;
41mod map_array;
42mod null_array;
43mod primitive_array;
44mod row_group_cache;
45mod struct_array;
46
47#[cfg(test)]
48mod test_util;
49
50// Note that this crate is public under the `experimental` feature flag.
51pub use builder::{ArrayReaderBuilder, CacheOptions, CacheOptionsBuilder};
52pub use byte_array::make_byte_array_reader;
53pub use byte_array_dictionary::make_byte_array_dictionary_reader;
54#[allow(unused_imports)] // Only used for benchmarks
55pub use byte_view_array::make_byte_view_array_reader;
56#[allow(unused_imports)] // Only used for benchmarks
57pub use fixed_len_byte_array::make_fixed_len_byte_array_reader;
58pub use fixed_size_list_array::FixedSizeListArrayReader;
59pub use list_array::ListArrayReader;
60pub use map_array::MapArrayReader;
61pub use null_array::NullArrayReader;
62pub use primitive_array::PrimitiveArrayReader;
63pub use row_group_cache::RowGroupCache;
64pub use struct_array::StructArrayReader;
65
66/// Reads Parquet data into Arrow Arrays.
67///
68/// This is an internal implementation detail of the Parquet reader, and is not
69/// intended for public use.
70///
71/// This is the core trait for reading encoded Parquet data directly into Arrow
72/// Arrays efficiently. There are various specializations of this trait for
73/// different combinations of encodings and arrays, such as
74/// [`PrimitiveArrayReader`], [`ListArrayReader`], etc.
75///
76/// Each `ArrayReader` logically contains the following state
77/// 1. A handle to the encoded Parquet data
78/// 2. An in progress buffered Array
79///
80/// Data can either be read in batches using [`ArrayReader::next_batch`] or
81/// incrementally using [`ArrayReader::read_records`] and [`ArrayReader::skip_records`].
82pub trait ArrayReader: Send {
83    // TODO: this function is never used, and the trait is not public. Perhaps this should be
84    // removed.
85    #[allow(dead_code)]
86    fn as_any(&self) -> &dyn Any;
87
88    /// Returns the arrow type of this array reader.
89    fn get_data_type(&self) -> &ArrowType;
90
91    /// Reads at most `batch_size` records into an arrow array and return it.
92    #[cfg(any(feature = "experimental", test))]
93    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
94        self.read_records(batch_size)?;
95        self.consume_batch()
96    }
97
98    /// Reads at most `batch_size` records' bytes into buffer
99    ///
100    /// Returns the number of records read, which can be less than `batch_size` if
101    /// pages is exhausted.
102    fn read_records(&mut self, batch_size: usize) -> Result<usize>;
103
104    /// Consume all currently stored buffer data
105    /// into an arrow array and return it.
106    fn consume_batch(&mut self) -> Result<ArrayRef>;
107
108    /// Skips over `num_records` records, returning the number of rows skipped
109    ///
110    /// Note that calling `skip_records` with large values of `num_records` is
111    /// efficient as it avoids decoding data into the the in-progress array.
112    /// However, there is overhead to calling this function, so for small values of
113    /// `num_records`, it can be more efficient to call read_records and apply
114    /// a filter to the resulting array.
115    fn skip_records(&mut self, num_records: usize) -> Result<usize>;
116
117    /// If this array has a non-zero definition level, i.e. has a nullable parent
118    /// array, returns the definition levels of data from the last call of `next_batch`
119    ///
120    /// Otherwise returns None
121    ///
122    /// This is used by parent [`ArrayReader`] to compute their null bitmaps
123    fn get_def_levels(&self) -> Option<&[i16]>;
124
125    /// If this array has a non-zero repetition level, i.e. has a repeated parent
126    /// array, returns the repetition levels of data from the last call of `next_batch`
127    ///
128    /// Otherwise returns None
129    ///
130    /// This is used by parent [`ArrayReader`] to compute their array offsets
131    fn get_rep_levels(&self) -> Option<&[i16]>;
132}
133
134/// Interface for reading data pages from the columns of one or more RowGroups.
135pub trait RowGroups {
136    /// Get the number of rows in this collection
137    fn num_rows(&self) -> usize;
138
139    /// Returns a [`PageIterator`] for all pages in the specified column chunk
140    /// across all row groups in this collection.
141    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
142}
143
144impl RowGroups for Arc<dyn FileReader> {
145    fn num_rows(&self) -> usize {
146        self.metadata().file_metadata().num_rows() as usize
147    }
148
149    fn column_chunks(&self, column_index: usize) -> Result<Box<dyn PageIterator>> {
150        let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
151        Ok(Box::new(iterator))
152    }
153}
154
155/// Uses `record_reader` to read up to `batch_size` records from `pages`
156///
157/// Returns the number of records read, which can be less than `batch_size` if
158/// pages is exhausted.
159fn read_records<V, CV>(
160    record_reader: &mut GenericRecordReader<V, CV>,
161    pages: &mut dyn PageIterator,
162    batch_size: usize,
163) -> Result<usize>
164where
165    V: ValuesBuffer,
166    CV: ColumnValueDecoder<Buffer = V>,
167{
168    let mut records_read = 0usize;
169    while records_read < batch_size {
170        let records_to_read = batch_size - records_read;
171
172        let records_read_once = record_reader.read_records(records_to_read)?;
173        records_read += records_read_once;
174
175        // Record reader exhausted
176        if records_read_once < records_to_read {
177            if let Some(page_reader) = pages.next() {
178                // Read from new page reader (i.e. column chunk)
179                record_reader.set_page_reader(page_reader?)?;
180            } else {
181                // Page reader also exhausted
182                break;
183            }
184        }
185    }
186    Ok(records_read)
187}
188
189/// Uses `record_reader` to skip up to `batch_size` records from `pages`
190///
191/// Returns the number of records skipped, which can be less than `batch_size` if
192/// pages is exhausted
193fn skip_records<V, CV>(
194    record_reader: &mut GenericRecordReader<V, CV>,
195    pages: &mut dyn PageIterator,
196    batch_size: usize,
197) -> Result<usize>
198where
199    V: ValuesBuffer,
200    CV: ColumnValueDecoder<Buffer = V>,
201{
202    let mut records_skipped = 0usize;
203    while records_skipped < batch_size {
204        let records_to_read = batch_size - records_skipped;
205
206        let records_skipped_once = record_reader.skip_records(records_to_read)?;
207        records_skipped += records_skipped_once;
208
209        // Record reader exhausted
210        if records_skipped_once < records_to_read {
211            if let Some(page_reader) = pages.next() {
212                // Read from new page reader (i.e. column chunk)
213                record_reader.set_page_reader(page_reader?)?;
214            } else {
215                // Page reader also exhausted
216                break;
217            }
218        }
219    }
220    Ok(records_skipped)
221}