parquet/arrow/array_reader/
builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::{Arc, Mutex};
19
20use arrow_schema::{DataType, Fields, SchemaBuilder};
21
22use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
23use crate::arrow::array_reader::cached_array_reader::CacheRole;
24use crate::arrow::array_reader::cached_array_reader::CachedArrayReader;
25use crate::arrow::array_reader::empty_array::make_empty_array_reader;
26use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
27use crate::arrow::array_reader::row_group_cache::RowGroupCache;
28use crate::arrow::array_reader::{
29    make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
30    FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
31    PrimitiveArrayReader, RowGroups, StructArrayReader,
32};
33use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
34use crate::arrow::schema::{ParquetField, ParquetFieldType};
35use crate::arrow::ProjectionMask;
36use crate::basic::Type as PhysicalType;
37use crate::data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type};
38use crate::errors::{ParquetError, Result};
39use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
40
41/// Builder for [`CacheOptions`]
42#[derive(Debug, Clone)]
43pub struct CacheOptionsBuilder<'a> {
44    /// Projection mask to apply to the cache
45    pub projection_mask: &'a ProjectionMask,
46    /// Cache to use for storing row groups
47    pub cache: Arc<Mutex<RowGroupCache>>,
48}
49
50impl<'a> CacheOptionsBuilder<'a> {
51    /// create a new cache options builder
52    pub fn new(projection_mask: &'a ProjectionMask, cache: Arc<Mutex<RowGroupCache>>) -> Self {
53        Self {
54            projection_mask,
55            cache,
56        }
57    }
58
59    /// Return a new [`CacheOptions`] for producing (populating) the cache
60    pub fn producer(self) -> CacheOptions<'a> {
61        CacheOptions {
62            projection_mask: self.projection_mask,
63            cache: self.cache,
64            role: CacheRole::Producer,
65        }
66    }
67
68    /// return a new [`CacheOptions`] for consuming (reading) the cache
69    pub fn consumer(self) -> CacheOptions<'a> {
70        CacheOptions {
71            projection_mask: self.projection_mask,
72            cache: self.cache,
73            role: CacheRole::Consumer,
74        }
75    }
76}
77
78/// Cache options containing projection mask, cache, and role
79#[derive(Clone)]
80pub struct CacheOptions<'a> {
81    pub projection_mask: &'a ProjectionMask,
82    pub cache: Arc<Mutex<RowGroupCache>>,
83    pub role: CacheRole,
84}
85
86/// Builds [`ArrayReader`]s from parquet schema, projection mask, and RowGroups reader
87pub struct ArrayReaderBuilder<'a> {
88    /// Source of row group data
89    row_groups: &'a dyn RowGroups,
90    /// Optional cache options for the array reader
91    cache_options: Option<&'a CacheOptions<'a>>,
92    /// metrics
93    metrics: &'a ArrowReaderMetrics,
94}
95
96impl<'a> ArrayReaderBuilder<'a> {
97    pub fn new(row_groups: &'a dyn RowGroups, metrics: &'a ArrowReaderMetrics) -> Self {
98        Self {
99            row_groups,
100            cache_options: None,
101            metrics,
102        }
103    }
104
105    /// Add cache options to the builder
106    pub fn with_cache_options(mut self, cache_options: Option<&'a CacheOptions<'a>>) -> Self {
107        self.cache_options = cache_options;
108        self
109    }
110
111    /// Create [`ArrayReader`] from parquet schema, projection mask, and parquet file reader.
112    pub fn build_array_reader(
113        &self,
114        field: Option<&ParquetField>,
115        mask: &ProjectionMask,
116    ) -> Result<Box<dyn ArrayReader>> {
117        let reader = field
118            .and_then(|field| self.build_reader(field, mask).transpose())
119            .transpose()?
120            .unwrap_or_else(|| make_empty_array_reader(self.num_rows()));
121
122        Ok(reader)
123    }
124
125    /// Return the total number of rows
126    fn num_rows(&self) -> usize {
127        self.row_groups.num_rows()
128    }
129
130    fn build_reader(
131        &self,
132        field: &ParquetField,
133        mask: &ProjectionMask,
134    ) -> Result<Option<Box<dyn ArrayReader>>> {
135        match field.field_type {
136            ParquetFieldType::Primitive { col_idx, .. } => {
137                let Some(reader) = self.build_primitive_reader(field, mask)? else {
138                    return Ok(None);
139                };
140                let Some(cache_options) = self.cache_options.as_ref() else {
141                    return Ok(Some(reader));
142                };
143
144                if cache_options.projection_mask.leaf_included(col_idx) {
145                    Ok(Some(Box::new(CachedArrayReader::new(
146                        reader,
147                        Arc::clone(&cache_options.cache),
148                        col_idx,
149                        cache_options.role,
150                        self.metrics.clone(), // cheap clone
151                    ))))
152                } else {
153                    Ok(Some(reader))
154                }
155            }
156            ParquetFieldType::Group { .. } => match &field.arrow_type {
157                DataType::Map(_, _) => self.build_map_reader(field, mask),
158                DataType::Struct(_) => self.build_struct_reader(field, mask),
159                DataType::List(_) => self.build_list_reader(field, mask, false),
160                DataType::LargeList(_) => self.build_list_reader(field, mask, true),
161                DataType::FixedSizeList(_, _) => self.build_fixed_size_list_reader(field, mask),
162                d => unimplemented!("reading group type {} not implemented", d),
163            },
164        }
165    }
166
167    /// Build array reader for map type.
168    fn build_map_reader(
169        &self,
170        field: &ParquetField,
171        mask: &ProjectionMask,
172    ) -> Result<Option<Box<dyn ArrayReader>>> {
173        let children = field.children().unwrap();
174        assert_eq!(children.len(), 2);
175
176        let key_reader = self.build_reader(&children[0], mask)?;
177        let value_reader = self.build_reader(&children[1], mask)?;
178
179        match (key_reader, value_reader) {
180            (Some(key_reader), Some(value_reader)) => {
181                // Need to retrieve underlying data type to handle projection
182                let key_type = key_reader.get_data_type().clone();
183                let value_type = value_reader.get_data_type().clone();
184
185                let data_type = match &field.arrow_type {
186                    DataType::Map(map_field, is_sorted) => match map_field.data_type() {
187                        DataType::Struct(fields) => {
188                            assert_eq!(fields.len(), 2);
189                            let struct_field = map_field.as_ref().clone().with_data_type(
190                                DataType::Struct(Fields::from(vec![
191                                    fields[0].as_ref().clone().with_data_type(key_type),
192                                    fields[1].as_ref().clone().with_data_type(value_type),
193                                ])),
194                            );
195                            DataType::Map(Arc::new(struct_field), *is_sorted)
196                        }
197                        _ => unreachable!(),
198                    },
199                    _ => unreachable!(),
200                };
201
202                Ok(Some(Box::new(MapArrayReader::new(
203                    key_reader,
204                    value_reader,
205                    data_type,
206                    field.def_level,
207                    field.rep_level,
208                    field.nullable,
209                ))))
210            }
211            (None, None) => Ok(None),
212            _ => Err(general_err!(
213                "partial projection of MapArray is not supported"
214            )),
215        }
216    }
217
218    /// Build array reader for list type.
219    fn build_list_reader(
220        &self,
221        field: &ParquetField,
222        mask: &ProjectionMask,
223        is_large: bool,
224    ) -> Result<Option<Box<dyn ArrayReader>>> {
225        let children = field.children().unwrap();
226        assert_eq!(children.len(), 1);
227
228        let reader = match self.build_reader(&children[0], mask)? {
229            Some(item_reader) => {
230                // Need to retrieve underlying data type to handle projection
231                let item_type = item_reader.get_data_type().clone();
232                let data_type = match &field.arrow_type {
233                    DataType::List(f) => {
234                        DataType::List(Arc::new(f.as_ref().clone().with_data_type(item_type)))
235                    }
236                    DataType::LargeList(f) => {
237                        DataType::LargeList(Arc::new(f.as_ref().clone().with_data_type(item_type)))
238                    }
239                    _ => unreachable!(),
240                };
241
242                let reader = match is_large {
243                    false => Box::new(ListArrayReader::<i32>::new(
244                        item_reader,
245                        data_type,
246                        field.def_level,
247                        field.rep_level,
248                        field.nullable,
249                    )) as _,
250                    true => Box::new(ListArrayReader::<i64>::new(
251                        item_reader,
252                        data_type,
253                        field.def_level,
254                        field.rep_level,
255                        field.nullable,
256                    )) as _,
257                };
258                Some(reader)
259            }
260            None => None,
261        };
262        Ok(reader)
263    }
264
265    /// Build array reader for fixed-size list type.
266    fn build_fixed_size_list_reader(
267        &self,
268        field: &ParquetField,
269        mask: &ProjectionMask,
270    ) -> Result<Option<Box<dyn ArrayReader>>> {
271        let children = field.children().unwrap();
272        assert_eq!(children.len(), 1);
273
274        let reader = match self.build_reader(&children[0], mask)? {
275            Some(item_reader) => {
276                let item_type = item_reader.get_data_type().clone();
277                let reader = match &field.arrow_type {
278                    &DataType::FixedSizeList(ref f, size) => {
279                        let data_type = DataType::FixedSizeList(
280                            Arc::new(f.as_ref().clone().with_data_type(item_type)),
281                            size,
282                        );
283
284                        Box::new(FixedSizeListArrayReader::new(
285                            item_reader,
286                            size as usize,
287                            data_type,
288                            field.def_level,
289                            field.rep_level,
290                            field.nullable,
291                        )) as _
292                    }
293                    _ => unimplemented!(),
294                };
295                Some(reader)
296            }
297            None => None,
298        };
299        Ok(reader)
300    }
301
302    /// Creates primitive array reader for each primitive type.
303    fn build_primitive_reader(
304        &self,
305        field: &ParquetField,
306        mask: &ProjectionMask,
307    ) -> Result<Option<Box<dyn ArrayReader>>> {
308        let (col_idx, primitive_type) = match &field.field_type {
309            ParquetFieldType::Primitive {
310                col_idx,
311                primitive_type,
312            } => match primitive_type.as_ref() {
313                Type::PrimitiveType { .. } => (*col_idx, primitive_type.clone()),
314                Type::GroupType { .. } => unreachable!(),
315            },
316            _ => unreachable!(),
317        };
318
319        if !mask.leaf_included(col_idx) {
320            return Ok(None);
321        }
322
323        let physical_type = primitive_type.get_physical_type();
324
325        // We don't track the column path in ParquetField as it adds a potential source
326        // of bugs when the arrow mapping converts more than one level in the parquet
327        // schema into a single arrow field.
328        //
329        // None of the readers actually use this field, but it is required for this type,
330        // so just stick a placeholder in
331        let column_desc = Arc::new(ColumnDescriptor::new(
332            primitive_type,
333            field.def_level,
334            field.rep_level,
335            ColumnPath::new(vec![]),
336        ));
337
338        let page_iterator = self.row_groups.column_chunks(col_idx)?;
339        let arrow_type = Some(field.arrow_type.clone());
340
341        let reader = match physical_type {
342            PhysicalType::BOOLEAN => Box::new(PrimitiveArrayReader::<BoolType>::new(
343                page_iterator,
344                column_desc,
345                arrow_type,
346            )?) as _,
347            PhysicalType::INT32 => {
348                if let Some(DataType::Null) = arrow_type {
349                    Box::new(NullArrayReader::<Int32Type>::new(
350                        page_iterator,
351                        column_desc,
352                    )?) as _
353                } else {
354                    Box::new(PrimitiveArrayReader::<Int32Type>::new(
355                        page_iterator,
356                        column_desc,
357                        arrow_type,
358                    )?) as _
359                }
360            }
361            PhysicalType::INT64 => Box::new(PrimitiveArrayReader::<Int64Type>::new(
362                page_iterator,
363                column_desc,
364                arrow_type,
365            )?) as _,
366            PhysicalType::INT96 => Box::new(PrimitiveArrayReader::<Int96Type>::new(
367                page_iterator,
368                column_desc,
369                arrow_type,
370            )?) as _,
371            PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::<FloatType>::new(
372                page_iterator,
373                column_desc,
374                arrow_type,
375            )?) as _,
376            PhysicalType::DOUBLE => Box::new(PrimitiveArrayReader::<DoubleType>::new(
377                page_iterator,
378                column_desc,
379                arrow_type,
380            )?) as _,
381            PhysicalType::BYTE_ARRAY => match arrow_type {
382                Some(DataType::Dictionary(_, _)) => {
383                    make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
384                }
385                Some(DataType::Utf8View | DataType::BinaryView) => {
386                    make_byte_view_array_reader(page_iterator, column_desc, arrow_type)?
387                }
388                _ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?,
389            },
390            PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type {
391                Some(DataType::Dictionary(_, _)) => {
392                    make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
393                }
394                _ => make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type)?,
395            },
396        };
397        Ok(Some(reader))
398    }
399
400    fn build_struct_reader(
401        &self,
402        field: &ParquetField,
403        mask: &ProjectionMask,
404    ) -> Result<Option<Box<dyn ArrayReader>>> {
405        let arrow_fields = match &field.arrow_type {
406            DataType::Struct(children) => children,
407            _ => unreachable!(),
408        };
409        let children = field.children().unwrap();
410        assert_eq!(arrow_fields.len(), children.len());
411
412        let mut readers = Vec::with_capacity(children.len());
413        let mut builder = SchemaBuilder::with_capacity(children.len());
414
415        for (arrow, parquet) in arrow_fields.iter().zip(children) {
416            if let Some(reader) = self.build_reader(parquet, mask)? {
417                // Need to retrieve underlying data type to handle projection
418                let child_type = reader.get_data_type().clone();
419                builder.push(arrow.as_ref().clone().with_data_type(child_type));
420                readers.push(reader);
421            }
422        }
423
424        if readers.is_empty() {
425            return Ok(None);
426        }
427
428        Ok(Some(Box::new(StructArrayReader::new(
429            DataType::Struct(builder.finish().fields),
430            readers,
431            field.def_level,
432            field.rep_level,
433            field.nullable,
434        ))))
435    }
436}
437
438#[cfg(test)]
439mod tests {
440    use super::*;
441    use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
442    use crate::file::reader::{FileReader, SerializedFileReader};
443    use crate::util::test_common::file_util::get_test_file;
444    use arrow::datatypes::Field;
445    use std::sync::Arc;
446
447    #[test]
448    fn test_create_array_reader() {
449        let file = get_test_file("nulls.snappy.parquet");
450        let file_reader: Arc<dyn FileReader> = Arc::new(SerializedFileReader::new(file).unwrap());
451
452        let file_metadata = file_reader.metadata().file_metadata();
453        let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
454        let (_, fields) = parquet_to_arrow_schema_and_fields(
455            file_metadata.schema_descr(),
456            ProjectionMask::all(),
457            file_metadata.key_value_metadata(),
458        )
459        .unwrap();
460
461        let metrics = ArrowReaderMetrics::disabled();
462        let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
463            .build_array_reader(fields.as_ref(), &mask)
464            .unwrap();
465
466        // Create arrow types
467        let arrow_type = DataType::Struct(Fields::from(vec![Field::new(
468            "b_struct",
469            DataType::Struct(vec![Field::new("b_c_int", DataType::Int32, true)].into()),
470            true,
471        )]));
472
473        assert_eq!(array_reader.get_data_type(), &arrow_type);
474    }
475}