parquet/arrow/array_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Logic for reading into arrow arrays
19
20use crate::errors::Result;
21use arrow_array::ArrayRef;
22use arrow_schema::DataType as ArrowType;
23use std::any::Any;
24use std::sync::Arc;
25
26use crate::arrow::record_reader::buffer::ValuesBuffer;
27use crate::arrow::record_reader::GenericRecordReader;
28use crate::column::page::PageIterator;
29use crate::column::reader::decoder::ColumnValueDecoder;
30use crate::file::reader::{FilePageIterator, FileReader};
31
32mod builder;
33mod byte_array;
34mod byte_array_dictionary;
35mod byte_view_array;
36mod empty_array;
37mod fixed_len_byte_array;
38mod fixed_size_list_array;
39mod list_array;
40mod map_array;
41mod null_array;
42mod primitive_array;
43mod struct_array;
44
45#[cfg(test)]
46mod test_util;
47
48// Note that this crate is public under the `experimental` feature flag.
49pub use builder::ArrayReaderBuilder;
50pub use byte_array::make_byte_array_reader;
51pub use byte_array_dictionary::make_byte_array_dictionary_reader;
52#[allow(unused_imports)] // Only used for benchmarks
53pub use byte_view_array::make_byte_view_array_reader;
54#[allow(unused_imports)] // Only used for benchmarks
55pub use fixed_len_byte_array::make_fixed_len_byte_array_reader;
56pub use fixed_size_list_array::FixedSizeListArrayReader;
57pub use list_array::ListArrayReader;
58pub use map_array::MapArrayReader;
59pub use null_array::NullArrayReader;
60pub use primitive_array::PrimitiveArrayReader;
61pub use struct_array::StructArrayReader;
62
63/// Array reader reads parquet data into arrow array.
64pub trait ArrayReader: Send {
65    // TODO: this function is never used, and the trait is not public. Perhaps this should be
66    // removed.
67    #[allow(dead_code)]
68    fn as_any(&self) -> &dyn Any;
69
70    /// Returns the arrow type of this array reader.
71    fn get_data_type(&self) -> &ArrowType;
72
73    /// Reads at most `batch_size` records into an arrow array and return it.
74    #[cfg(any(feature = "experimental", test))]
75    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
76        self.read_records(batch_size)?;
77        self.consume_batch()
78    }
79
80    /// Reads at most `batch_size` records' bytes into buffer
81    ///
82    /// Returns the number of records read, which can be less than `batch_size` if
83    /// pages is exhausted.
84    fn read_records(&mut self, batch_size: usize) -> Result<usize>;
85
86    /// Consume all currently stored buffer data
87    /// into an arrow array and return it.
88    fn consume_batch(&mut self) -> Result<ArrayRef>;
89
90    /// Skips over `num_records` records, returning the number of rows skipped
91    fn skip_records(&mut self, num_records: usize) -> Result<usize>;
92
93    /// If this array has a non-zero definition level, i.e. has a nullable parent
94    /// array, returns the definition levels of data from the last call of `next_batch`
95    ///
96    /// Otherwise returns None
97    ///
98    /// This is used by parent [`ArrayReader`] to compute their null bitmaps
99    fn get_def_levels(&self) -> Option<&[i16]>;
100
101    /// If this array has a non-zero repetition level, i.e. has a repeated parent
102    /// array, returns the repetition levels of data from the last call of `next_batch`
103    ///
104    /// Otherwise returns None
105    ///
106    /// This is used by parent [`ArrayReader`] to compute their array offsets
107    fn get_rep_levels(&self) -> Option<&[i16]>;
108}
109
110/// A collection of row groups
111pub trait RowGroups {
112    /// Get the number of rows in this collection
113    fn num_rows(&self) -> usize;
114
115    /// Returns a [`PageIterator`] for all pages in the specified column chunk
116    /// across all row groups in this collection.
117    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
118}
119
120impl RowGroups for Arc<dyn FileReader> {
121    fn num_rows(&self) -> usize {
122        self.metadata().file_metadata().num_rows() as usize
123    }
124
125    fn column_chunks(&self, column_index: usize) -> Result<Box<dyn PageIterator>> {
126        let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
127        Ok(Box::new(iterator))
128    }
129}
130
131/// Uses `record_reader` to read up to `batch_size` records from `pages`
132///
133/// Returns the number of records read, which can be less than `batch_size` if
134/// pages is exhausted.
135fn read_records<V, CV>(
136    record_reader: &mut GenericRecordReader<V, CV>,
137    pages: &mut dyn PageIterator,
138    batch_size: usize,
139) -> Result<usize>
140where
141    V: ValuesBuffer,
142    CV: ColumnValueDecoder<Buffer = V>,
143{
144    let mut records_read = 0usize;
145    while records_read < batch_size {
146        let records_to_read = batch_size - records_read;
147
148        let records_read_once = record_reader.read_records(records_to_read)?;
149        records_read += records_read_once;
150
151        // Record reader exhausted
152        if records_read_once < records_to_read {
153            if let Some(page_reader) = pages.next() {
154                // Read from new page reader (i.e. column chunk)
155                record_reader.set_page_reader(page_reader?)?;
156            } else {
157                // Page reader also exhausted
158                break;
159            }
160        }
161    }
162    Ok(records_read)
163}
164
165/// Uses `record_reader` to skip up to `batch_size` records from `pages`
166///
167/// Returns the number of records skipped, which can be less than `batch_size` if
168/// pages is exhausted
169fn skip_records<V, CV>(
170    record_reader: &mut GenericRecordReader<V, CV>,
171    pages: &mut dyn PageIterator,
172    batch_size: usize,
173) -> Result<usize>
174where
175    V: ValuesBuffer,
176    CV: ColumnValueDecoder<Buffer = V>,
177{
178    let mut records_skipped = 0usize;
179    while records_skipped < batch_size {
180        let records_to_read = batch_size - records_skipped;
181
182        let records_skipped_once = record_reader.skip_records(records_to_read)?;
183        records_skipped += records_skipped_once;
184
185        // Record reader exhausted
186        if records_skipped_once < records_to_read {
187            if let Some(page_reader) = pages.next() {
188                // Read from new page reader (i.e. column chunk)
189                record_reader.set_page_reader(page_reader?)?;
190            } else {
191                // Page reader also exhausted
192                break;
193            }
194        }
195    }
196    Ok(records_skipped)
197}