Skip to main content

parquet/arrow/array_reader/
builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::{Arc, RwLock};
19
20use arrow_schema::{DataType, Fields, SchemaBuilder};
21
22use crate::arrow::ProjectionMask;
23use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
24use crate::arrow::array_reader::cached_array_reader::CacheRole;
25use crate::arrow::array_reader::cached_array_reader::CachedArrayReader;
26use crate::arrow::array_reader::empty_array::make_empty_array_reader;
27use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
28use crate::arrow::array_reader::row_group_cache::RowGroupCache;
29use crate::arrow::array_reader::row_group_index::RowGroupIndexReader;
30use crate::arrow::array_reader::row_number::RowNumberReader;
31use crate::arrow::array_reader::{
32    ArrayReader, FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
33    PrimitiveArrayReader, RowGroups, StructArrayReader, make_byte_array_dictionary_reader,
34    make_byte_array_reader,
35};
36use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
37use crate::arrow::schema::{ParquetField, ParquetFieldType, VirtualColumnType};
38use crate::basic::Type as PhysicalType;
39use crate::data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type};
40use crate::errors::{ParquetError, Result};
41use crate::file::metadata::ParquetMetaData;
42use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
43
44/// Builder for [`CacheOptions`]
45#[derive(Debug, Clone)]
46pub struct CacheOptionsBuilder<'a> {
47    /// Projection mask to apply to the cache
48    pub projection_mask: &'a ProjectionMask,
49    /// Cache to use for storing row groups
50    pub cache: &'a Arc<RwLock<RowGroupCache>>,
51}
52
53impl<'a> CacheOptionsBuilder<'a> {
54    /// create a new cache options builder
55    pub fn new(projection_mask: &'a ProjectionMask, cache: &'a Arc<RwLock<RowGroupCache>>) -> Self {
56        Self {
57            projection_mask,
58            cache,
59        }
60    }
61
62    /// Return a new [`CacheOptions`] for producing (populating) the cache
63    pub fn producer(self) -> CacheOptions<'a> {
64        CacheOptions {
65            projection_mask: self.projection_mask,
66            cache: self.cache,
67            role: CacheRole::Producer,
68        }
69    }
70
71    /// return a new [`CacheOptions`] for consuming (reading) the cache
72    pub fn consumer(self) -> CacheOptions<'a> {
73        CacheOptions {
74            projection_mask: self.projection_mask,
75            cache: self.cache,
76            role: CacheRole::Consumer,
77        }
78    }
79}
80
81/// Cache options containing projection mask, cache, and role
82#[derive(Clone)]
83pub struct CacheOptions<'a> {
84    pub projection_mask: &'a ProjectionMask,
85    pub cache: &'a Arc<RwLock<RowGroupCache>>,
86    pub role: CacheRole,
87}
88
89/// Builds [`ArrayReader`]s from parquet schema, projection mask, and RowGroups reader
90pub struct ArrayReaderBuilder<'a> {
91    /// Source of row group data
92    row_groups: &'a dyn RowGroups,
93    /// Optional cache options for the array reader
94    cache_options: Option<&'a CacheOptions<'a>>,
95    /// Parquet metadata for computing virtual column values
96    parquet_metadata: Option<&'a ParquetMetaData>,
97    /// metrics
98    metrics: &'a ArrowReaderMetrics,
99}
100
101impl<'a> ArrayReaderBuilder<'a> {
102    pub fn new(row_groups: &'a dyn RowGroups, metrics: &'a ArrowReaderMetrics) -> Self {
103        Self {
104            row_groups,
105            cache_options: None,
106            parquet_metadata: None,
107            metrics,
108        }
109    }
110
111    /// Add cache options to the builder
112    pub fn with_cache_options(mut self, cache_options: Option<&'a CacheOptions<'a>>) -> Self {
113        self.cache_options = cache_options;
114        self
115    }
116
117    /// Add parquet metadata to the builder for computing virtual column values
118    pub fn with_parquet_metadata(mut self, parquet_metadata: &'a ParquetMetaData) -> Self {
119        self.parquet_metadata = Some(parquet_metadata);
120        self
121    }
122
123    /// Create [`ArrayReader`] from parquet schema, projection mask, and parquet file reader.
124    pub fn build_array_reader(
125        &self,
126        field: Option<&ParquetField>,
127        mask: &ProjectionMask,
128    ) -> Result<Box<dyn ArrayReader>> {
129        let reader = field
130            .and_then(|field| self.build_reader(field, mask).transpose())
131            .transpose()?
132            .unwrap_or_else(|| make_empty_array_reader(self.num_rows()));
133
134        Ok(reader)
135    }
136
137    /// Return the total number of rows
138    fn num_rows(&self) -> usize {
139        self.row_groups.num_rows()
140    }
141
142    fn build_reader(
143        &self,
144        field: &ParquetField,
145        mask: &ProjectionMask,
146    ) -> Result<Option<Box<dyn ArrayReader>>> {
147        match field.field_type {
148            ParquetFieldType::Primitive { col_idx, .. } => {
149                let Some(reader) = self.build_primitive_reader(field, mask)? else {
150                    return Ok(None);
151                };
152                let Some(cache_options) = self.cache_options.as_ref() else {
153                    return Ok(Some(reader));
154                };
155
156                if cache_options.projection_mask.leaf_included(col_idx) {
157                    Ok(Some(Box::new(CachedArrayReader::new(
158                        reader,
159                        Arc::clone(cache_options.cache),
160                        col_idx,
161                        cache_options.role,
162                        self.metrics.clone(), // cheap clone
163                    ))))
164                } else {
165                    Ok(Some(reader))
166                }
167            }
168            ParquetFieldType::Virtual(virtual_type) => {
169                // Virtual columns don't have data in the parquet file
170                // They need to be built by specialized readers
171                match virtual_type {
172                    VirtualColumnType::RowNumber => Ok(Some(self.build_row_number_reader()?)),
173                    VirtualColumnType::RowGroupIndex => {
174                        Ok(Some(self.build_row_group_index_reader()?))
175                    }
176                }
177            }
178            ParquetFieldType::Group { .. } => match &field.arrow_type {
179                DataType::Map(_, _) => self.build_map_reader(field, mask),
180                DataType::Struct(_) => self.build_struct_reader(field, mask),
181                DataType::List(_) => self.build_list_reader(field, mask, false),
182                DataType::LargeList(_) => self.build_list_reader(field, mask, true),
183                DataType::FixedSizeList(_, _) => self.build_fixed_size_list_reader(field, mask),
184                d => unimplemented!("reading group type {} not implemented", d),
185            },
186        }
187    }
188
189    fn build_row_number_reader(&self) -> Result<Box<dyn ArrayReader>> {
190        let parquet_metadata = self.parquet_metadata.ok_or_else(|| {
191            ParquetError::General(
192                "ParquetMetaData is required to read virtual row number columns.".to_string(),
193            )
194        })?;
195        Ok(Box::new(RowNumberReader::try_new(
196            parquet_metadata,
197            self.row_groups.row_groups(),
198        )?))
199    }
200
201    fn build_row_group_index_reader(&self) -> Result<Box<dyn ArrayReader>> {
202        let parquet_metadata = self.parquet_metadata.ok_or_else(|| {
203            ParquetError::General(
204                "ParquetMetaData is required to read virtual row group index columns.".to_string(),
205            )
206        })?;
207        Ok(Box::new(RowGroupIndexReader::try_new(
208            parquet_metadata,
209            self.row_groups.row_groups(),
210        )?))
211    }
212
213    /// Build array reader for map type.
214    fn build_map_reader(
215        &self,
216        field: &ParquetField,
217        mask: &ProjectionMask,
218    ) -> Result<Option<Box<dyn ArrayReader>>> {
219        let children = field.children().unwrap();
220        assert_eq!(children.len(), 2);
221
222        let key_reader = self.build_reader(&children[0], mask)?;
223        let value_reader = self.build_reader(&children[1], mask)?;
224
225        match (key_reader, value_reader) {
226            (Some(key_reader), Some(value_reader)) => {
227                // Need to retrieve underlying data type to handle projection
228                let key_type = key_reader.get_data_type().clone();
229                let value_type = value_reader.get_data_type().clone();
230
231                let data_type = match &field.arrow_type {
232                    DataType::Map(map_field, is_sorted) => match map_field.data_type() {
233                        DataType::Struct(fields) => {
234                            assert_eq!(fields.len(), 2);
235                            let struct_field = map_field.as_ref().clone().with_data_type(
236                                DataType::Struct(Fields::from(vec![
237                                    fields[0].as_ref().clone().with_data_type(key_type),
238                                    fields[1].as_ref().clone().with_data_type(value_type),
239                                ])),
240                            );
241                            DataType::Map(Arc::new(struct_field), *is_sorted)
242                        }
243                        _ => unreachable!(),
244                    },
245                    _ => unreachable!(),
246                };
247
248                Ok(Some(Box::new(MapArrayReader::new(
249                    key_reader,
250                    value_reader,
251                    data_type,
252                    field.def_level,
253                    field.rep_level,
254                    field.nullable,
255                ))))
256            }
257            (None, None) => Ok(None),
258            _ => Err(general_err!(
259                "partial projection of MapArray is not supported"
260            )),
261        }
262    }
263
264    /// Build array reader for list type.
265    fn build_list_reader(
266        &self,
267        field: &ParquetField,
268        mask: &ProjectionMask,
269        is_large: bool,
270    ) -> Result<Option<Box<dyn ArrayReader>>> {
271        let children = field.children().unwrap();
272        assert_eq!(children.len(), 1);
273
274        let reader = match self.build_reader(&children[0], mask)? {
275            Some(item_reader) => {
276                // Need to retrieve underlying data type to handle projection
277                let item_type = item_reader.get_data_type().clone();
278                let data_type = match &field.arrow_type {
279                    DataType::List(f) => {
280                        DataType::List(Arc::new(f.as_ref().clone().with_data_type(item_type)))
281                    }
282                    DataType::LargeList(f) => {
283                        DataType::LargeList(Arc::new(f.as_ref().clone().with_data_type(item_type)))
284                    }
285                    _ => unreachable!(),
286                };
287
288                let reader = match is_large {
289                    false => Box::new(ListArrayReader::<i32>::new(
290                        item_reader,
291                        data_type,
292                        field.def_level,
293                        field.rep_level,
294                        field.nullable,
295                    )) as _,
296                    true => Box::new(ListArrayReader::<i64>::new(
297                        item_reader,
298                        data_type,
299                        field.def_level,
300                        field.rep_level,
301                        field.nullable,
302                    )) as _,
303                };
304                Some(reader)
305            }
306            None => None,
307        };
308        Ok(reader)
309    }
310
311    /// Build array reader for fixed-size list type.
312    fn build_fixed_size_list_reader(
313        &self,
314        field: &ParquetField,
315        mask: &ProjectionMask,
316    ) -> Result<Option<Box<dyn ArrayReader>>> {
317        let children = field.children().unwrap();
318        assert_eq!(children.len(), 1);
319
320        let reader = match self.build_reader(&children[0], mask)? {
321            Some(item_reader) => {
322                let item_type = item_reader.get_data_type().clone();
323                let reader = match &field.arrow_type {
324                    &DataType::FixedSizeList(ref f, size) => {
325                        let data_type = DataType::FixedSizeList(
326                            Arc::new(f.as_ref().clone().with_data_type(item_type)),
327                            size,
328                        );
329
330                        Box::new(FixedSizeListArrayReader::new(
331                            item_reader,
332                            size as usize,
333                            data_type,
334                            field.def_level,
335                            field.rep_level,
336                            field.nullable,
337                        )) as _
338                    }
339                    _ => unimplemented!(),
340                };
341                Some(reader)
342            }
343            None => None,
344        };
345        Ok(reader)
346    }
347
348    /// Creates primitive array reader for each primitive type.
349    fn build_primitive_reader(
350        &self,
351        field: &ParquetField,
352        mask: &ProjectionMask,
353    ) -> Result<Option<Box<dyn ArrayReader>>> {
354        let (col_idx, primitive_type) = match &field.field_type {
355            ParquetFieldType::Primitive {
356                col_idx,
357                primitive_type,
358            } => match primitive_type.as_ref() {
359                Type::PrimitiveType { .. } => (*col_idx, primitive_type.clone()),
360                Type::GroupType { .. } => unreachable!(),
361            },
362            _ => unreachable!(),
363        };
364
365        if !mask.leaf_included(col_idx) {
366            return Ok(None);
367        }
368
369        let physical_type = primitive_type.get_physical_type();
370
371        // We don't track the column path in ParquetField as it adds a potential source
372        // of bugs when the arrow mapping converts more than one level in the parquet
373        // schema into a single arrow field.
374        //
375        // None of the readers actually use this field, but it is required for this type,
376        // so just stick a placeholder in
377        let column_desc = Arc::new(ColumnDescriptor::new(
378            primitive_type,
379            field.def_level,
380            field.rep_level,
381            ColumnPath::new(vec![]),
382        ));
383
384        let page_iterator = self.row_groups.column_chunks(col_idx)?;
385        let arrow_type = Some(field.arrow_type.clone());
386
387        let reader = match physical_type {
388            PhysicalType::BOOLEAN => Box::new(PrimitiveArrayReader::<BoolType>::new(
389                page_iterator,
390                column_desc,
391                arrow_type,
392            )?) as _,
393            PhysicalType::INT32 => {
394                if let Some(DataType::Null) = arrow_type {
395                    Box::new(NullArrayReader::<Int32Type>::new(
396                        page_iterator,
397                        column_desc,
398                    )?) as _
399                } else {
400                    Box::new(PrimitiveArrayReader::<Int32Type>::new(
401                        page_iterator,
402                        column_desc,
403                        arrow_type,
404                    )?) as _
405                }
406            }
407            PhysicalType::INT64 => Box::new(PrimitiveArrayReader::<Int64Type>::new(
408                page_iterator,
409                column_desc,
410                arrow_type,
411            )?) as _,
412            PhysicalType::INT96 => Box::new(PrimitiveArrayReader::<Int96Type>::new(
413                page_iterator,
414                column_desc,
415                arrow_type,
416            )?) as _,
417            PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::<FloatType>::new(
418                page_iterator,
419                column_desc,
420                arrow_type,
421            )?) as _,
422            PhysicalType::DOUBLE => Box::new(PrimitiveArrayReader::<DoubleType>::new(
423                page_iterator,
424                column_desc,
425                arrow_type,
426            )?) as _,
427            PhysicalType::BYTE_ARRAY => match arrow_type {
428                Some(DataType::Dictionary(_, _)) => {
429                    make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
430                }
431                Some(DataType::Utf8View | DataType::BinaryView) => {
432                    make_byte_view_array_reader(page_iterator, column_desc, arrow_type)?
433                }
434                _ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?,
435            },
436            PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type {
437                Some(DataType::Dictionary(_, _)) => {
438                    make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
439                }
440                _ => make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type)?,
441            },
442        };
443        Ok(Some(reader))
444    }
445
446    fn build_struct_reader(
447        &self,
448        field: &ParquetField,
449        mask: &ProjectionMask,
450    ) -> Result<Option<Box<dyn ArrayReader>>> {
451        let arrow_fields = match &field.arrow_type {
452            DataType::Struct(children) => children,
453            _ => unreachable!(),
454        };
455        let children = field.children().unwrap();
456        assert_eq!(arrow_fields.len(), children.len());
457
458        let mut readers = Vec::with_capacity(children.len());
459        let mut builder = SchemaBuilder::with_capacity(children.len());
460
461        for (arrow, parquet) in arrow_fields.iter().zip(children) {
462            if let Some(reader) = self.build_reader(parquet, mask)? {
463                // Need to retrieve underlying data type to handle projection
464                let child_type = reader.get_data_type().clone();
465                builder.push(arrow.as_ref().clone().with_data_type(child_type));
466                readers.push(reader);
467            }
468        }
469
470        if readers.is_empty() {
471            return Ok(None);
472        }
473
474        Ok(Some(Box::new(StructArrayReader::new(
475            DataType::Struct(builder.finish().fields),
476            readers,
477            field.def_level,
478            field.rep_level,
479            field.nullable,
480        ))))
481    }
482}
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487    use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
488    use crate::arrow::schema::virtual_type::RowNumber;
489    use crate::file::reader::{FileReader, SerializedFileReader};
490    use crate::util::test_common::file_util::get_test_file;
491    use arrow::datatypes::Field;
492    use std::sync::Arc;
493
494    #[test]
495    fn test_create_array_reader() {
496        let file = get_test_file("nulls.snappy.parquet");
497        let file_reader: Arc<dyn FileReader> = Arc::new(SerializedFileReader::new(file).unwrap());
498
499        let file_metadata = file_reader.metadata().file_metadata();
500        let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
501        let (_, fields) = parquet_to_arrow_schema_and_fields(
502            file_metadata.schema_descr(),
503            ProjectionMask::all(),
504            file_metadata.key_value_metadata(),
505            &[],
506        )
507        .unwrap();
508
509        let metrics = ArrowReaderMetrics::disabled();
510        let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
511            .build_array_reader(fields.as_ref(), &mask)
512            .unwrap();
513
514        // Create arrow types
515        let arrow_type = DataType::Struct(Fields::from(vec![Field::new(
516            "b_struct",
517            DataType::Struct(vec![Field::new("b_c_int", DataType::Int32, true)].into()),
518            true,
519        )]));
520
521        assert_eq!(array_reader.get_data_type(), &arrow_type);
522    }
523
524    #[test]
525    fn test_create_array_reader_with_row_numbers() {
526        let file = get_test_file("nulls.snappy.parquet");
527        let file_reader: Arc<dyn FileReader> = Arc::new(SerializedFileReader::new(file).unwrap());
528
529        let file_metadata = file_reader.metadata().file_metadata();
530        let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
531        let row_number_field = Arc::new(
532            Field::new("row_number", DataType::Int64, false).with_extension_type(RowNumber),
533        );
534        let (_, fields) = parquet_to_arrow_schema_and_fields(
535            file_metadata.schema_descr(),
536            ProjectionMask::all(),
537            file_metadata.key_value_metadata(),
538            std::slice::from_ref(&row_number_field),
539        )
540        .unwrap();
541
542        let metrics = ArrowReaderMetrics::disabled();
543        let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
544            .with_parquet_metadata(file_reader.metadata())
545            .build_array_reader(fields.as_ref(), &mask)
546            .unwrap();
547
548        // Create arrow types
549        let arrow_type = DataType::Struct(Fields::from(vec![
550            Field::new(
551                "b_struct",
552                DataType::Struct(vec![Field::new("b_c_int", DataType::Int32, true)].into()),
553                true,
554            ),
555            (*row_number_field).clone(),
556        ]));
557
558        assert_eq!(array_reader.get_data_type(), &arrow_type);
559    }
560}