parquet/arrow/array_reader/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic for reading into arrow arrays: [`ArrayReader`] and [`RowGroups`]
19
20use crate::errors::Result;
21use arrow_array::ArrayRef;
22use arrow_schema::DataType as ArrowType;
23use std::any::Any;
24use std::sync::Arc;
25
26use crate::arrow::record_reader::GenericRecordReader;
27use crate::arrow::record_reader::buffer::ValuesBuffer;
28use crate::column::page::PageIterator;
29use crate::column::reader::decoder::ColumnValueDecoder;
30use crate::file::metadata::ParquetMetaData;
31use crate::file::reader::{FilePageIterator, FileReader};
32
33mod builder;
34mod byte_array;
35mod byte_array_dictionary;
36mod byte_view_array;
37mod cached_array_reader;
38mod empty_array;
39mod fixed_len_byte_array;
40mod fixed_size_list_array;
41mod list_array;
42mod list_view_array;
43mod map_array;
44mod null_array;
45mod primitive_array;
46mod row_group_cache;
47mod row_group_index;
48mod row_number;
49mod struct_array;
50
51#[cfg(test)]
52mod test_util;
53
54// Note that this crate is public under the `experimental` feature flag.
55use crate::file::metadata::RowGroupMetaData;
56pub use builder::{ArrayReaderBuilder, CacheOptions, CacheOptionsBuilder};
57pub use byte_array::make_byte_array_reader;
58pub use byte_array_dictionary::make_byte_array_dictionary_reader;
59#[allow(unused_imports)] // Only used for benchmarks
60pub use byte_view_array::make_byte_view_array_reader;
61#[allow(unused_imports)] // Only used for benchmarks
62pub use fixed_len_byte_array::make_fixed_len_byte_array_reader;
63pub use fixed_size_list_array::FixedSizeListArrayReader;
64pub use list_array::ListArrayReader;
65pub use list_view_array::ListViewArrayReader;
66pub use map_array::MapArrayReader;
67pub use null_array::NullArrayReader;
68pub use primitive_array::PrimitiveArrayReader;
69pub use row_group_cache::RowGroupCache;
70pub use struct_array::StructArrayReader;
71
72/// Reads Parquet data into Arrow Arrays.
73///
74/// This is an internal implementation detail of the Parquet reader, and is not
75/// intended for public use.
76///
77/// This is the core trait for reading encoded Parquet data directly into Arrow
78/// Arrays efficiently. There are various specializations of this trait for
79/// different combinations of encodings and arrays, such as
80/// [`PrimitiveArrayReader`], [`ListArrayReader`], etc.
81///
82/// Each `ArrayReader` logically contains the following state
83/// 1. A handle to the encoded Parquet data
84/// 2. An in progress buffered Array
85///
86/// Data can either be read in batches using [`ArrayReader::next_batch`] or
87/// incrementally using [`ArrayReader::read_records`] and [`ArrayReader::skip_records`].
88pub trait ArrayReader: Send {
89 // TODO: this function is never used, and the trait is not public. Perhaps this should be
90 // removed.
91 #[allow(dead_code)]
92 fn as_any(&self) -> &dyn Any;
93
94 /// Returns the arrow type of this array reader.
95 fn get_data_type(&self) -> &ArrowType;
96
97 /// Reads at most `batch_size` records into an arrow array and return it.
98 #[cfg(any(feature = "experimental", test))]
99 fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
100 self.read_records(batch_size)?;
101 self.consume_batch()
102 }
103
104 /// Reads at most `batch_size` records' bytes into buffer
105 ///
106 /// Returns the number of records read, which can be less than `batch_size` if
107 /// pages is exhausted.
108 fn read_records(&mut self, batch_size: usize) -> Result<usize>;
109
110 /// Consume all currently stored buffer data
111 /// into an arrow array and return it.
112 fn consume_batch(&mut self) -> Result<ArrayRef>;
113
114 /// Skips over `num_records` records, returning the number of rows skipped
115 ///
116 /// Note that calling `skip_records` with large values of `num_records` is
117 /// efficient as it avoids decoding data into the the in-progress array.
118 /// However, there is overhead to calling this function, so for small values of
119 /// `num_records`, it can be more efficient to call read_records and apply
120 /// a filter to the resulting array.
121 fn skip_records(&mut self, num_records: usize) -> Result<usize>;
122
123 /// If this array has a non-zero definition level, i.e. has a nullable parent
124 /// array, returns the definition levels of data from the last call of `next_batch`
125 ///
126 /// Otherwise returns None
127 ///
128 /// This is used by parent [`ArrayReader`] to compute their null bitmaps
129 fn get_def_levels(&self) -> Option<&[i16]>;
130
131 /// If this array has a non-zero repetition level, i.e. has a repeated parent
132 /// array, returns the repetition levels of data from the last call of `next_batch`
133 ///
134 /// Otherwise returns None
135 ///
136 /// This is used by parent [`ArrayReader`] to compute their array offsets
137 fn get_rep_levels(&self) -> Option<&[i16]>;
138}
139
140/// Interface for reading data pages from the columns of one or more RowGroups.
141pub trait RowGroups {
142 /// Get the number of rows in this collection
143 fn num_rows(&self) -> usize;
144
145 /// Returns a [`PageIterator`] for all pages in the specified column chunk
146 /// across all row groups in this collection.
147 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
148
149 /// Returns an iterator over the row groups in this collection
150 ///
151 /// Note this may not include all row groups in [`Self::metadata`].
152 fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_>;
153
154 /// Returns the parquet metadata
155 fn metadata(&self) -> &ParquetMetaData;
156}
157
158impl RowGroups for Arc<dyn FileReader> {
159 fn num_rows(&self) -> usize {
160 FileReader::metadata(self.as_ref())
161 .file_metadata()
162 .num_rows() as usize
163 }
164
165 fn column_chunks(&self, column_index: usize) -> Result<Box<dyn PageIterator>> {
166 let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
167 Ok(Box::new(iterator))
168 }
169
170 fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
171 Box::new(FileReader::metadata(self.as_ref()).row_groups().iter())
172 }
173
174 fn metadata(&self) -> &ParquetMetaData {
175 FileReader::metadata(self.as_ref())
176 }
177}
178
179/// Uses `record_reader` to read up to `batch_size` records from `pages`
180///
181/// Returns the number of records read, which can be less than `batch_size` if
182/// pages is exhausted.
183fn read_records<V, CV>(
184 record_reader: &mut GenericRecordReader<V, CV>,
185 pages: &mut dyn PageIterator,
186 batch_size: usize,
187) -> Result<usize>
188where
189 V: ValuesBuffer,
190 CV: ColumnValueDecoder<Buffer = V>,
191{
192 let mut records_read = 0usize;
193 while records_read < batch_size {
194 let records_to_read = batch_size - records_read;
195
196 let records_read_once = record_reader.read_records(records_to_read)?;
197 records_read += records_read_once;
198
199 // Record reader exhausted
200 if records_read_once < records_to_read {
201 if let Some(page_reader) = pages.next() {
202 // Read from new page reader (i.e. column chunk)
203 record_reader.set_page_reader(page_reader?)?;
204 } else {
205 // Page reader also exhausted
206 break;
207 }
208 }
209 }
210 Ok(records_read)
211}
212
213/// Uses `record_reader` to skip up to `batch_size` records from `pages`
214///
215/// Returns the number of records skipped, which can be less than `batch_size` if
216/// pages is exhausted
217fn skip_records<V, CV>(
218 record_reader: &mut GenericRecordReader<V, CV>,
219 pages: &mut dyn PageIterator,
220 batch_size: usize,
221) -> Result<usize>
222where
223 V: ValuesBuffer,
224 CV: ColumnValueDecoder<Buffer = V>,
225{
226 let mut records_skipped = 0usize;
227 while records_skipped < batch_size {
228 let records_to_read = batch_size - records_skipped;
229
230 let records_skipped_once = record_reader.skip_records(records_to_read)?;
231 records_skipped += records_skipped_once;
232
233 // Record reader exhausted
234 if records_skipped_once < records_to_read {
235 if let Some(page_reader) = pages.next() {
236 // Read from new page reader (i.e. column chunk)
237 record_reader.set_page_reader(page_reader?)?;
238 } else {
239 // Page reader also exhausted
240 break;
241 }
242 }
243 }
244 Ok(records_skipped)
245}