parquet/arrow/array_reader/
builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow_schema::{DataType, Fields, SchemaBuilder};
21
22use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
23use crate::arrow::array_reader::empty_array::make_empty_array_reader;
24use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
25use crate::arrow::array_reader::{
26    make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
27    FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
28    PrimitiveArrayReader, RowGroups, StructArrayReader,
29};
30use crate::arrow::schema::{ParquetField, ParquetFieldType};
31use crate::arrow::ProjectionMask;
32use crate::basic::Type as PhysicalType;
33use crate::data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type};
34use crate::errors::{ParquetError, Result};
35use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
36
37/// Create array reader from parquet schema, projection mask, and parquet file reader.
38pub fn build_array_reader(
39    field: Option<&ParquetField>,
40    mask: &ProjectionMask,
41    row_groups: &dyn RowGroups,
42) -> Result<Box<dyn ArrayReader>> {
43    let reader = field
44        .and_then(|field| build_reader(field, mask, row_groups).transpose())
45        .transpose()?
46        .unwrap_or_else(|| make_empty_array_reader(row_groups.num_rows()));
47
48    Ok(reader)
49}
50
51fn build_reader(
52    field: &ParquetField,
53    mask: &ProjectionMask,
54    row_groups: &dyn RowGroups,
55) -> Result<Option<Box<dyn ArrayReader>>> {
56    match field.field_type {
57        ParquetFieldType::Primitive { .. } => build_primitive_reader(field, mask, row_groups),
58        ParquetFieldType::Group { .. } => match &field.arrow_type {
59            DataType::Map(_, _) => build_map_reader(field, mask, row_groups),
60            DataType::Struct(_) => build_struct_reader(field, mask, row_groups),
61            DataType::List(_) => build_list_reader(field, mask, false, row_groups),
62            DataType::LargeList(_) => build_list_reader(field, mask, true, row_groups),
63            DataType::FixedSizeList(_, _) => build_fixed_size_list_reader(field, mask, row_groups),
64            d => unimplemented!("reading group type {} not implemented", d),
65        },
66    }
67}
68
69/// Build array reader for map type.
70fn build_map_reader(
71    field: &ParquetField,
72    mask: &ProjectionMask,
73    row_groups: &dyn RowGroups,
74) -> Result<Option<Box<dyn ArrayReader>>> {
75    let children = field.children().unwrap();
76    assert_eq!(children.len(), 2);
77
78    let key_reader = build_reader(&children[0], mask, row_groups)?;
79    let value_reader = build_reader(&children[1], mask, row_groups)?;
80
81    match (key_reader, value_reader) {
82        (Some(key_reader), Some(value_reader)) => {
83            // Need to retrieve underlying data type to handle projection
84            let key_type = key_reader.get_data_type().clone();
85            let value_type = value_reader.get_data_type().clone();
86
87            let data_type = match &field.arrow_type {
88                DataType::Map(map_field, is_sorted) => match map_field.data_type() {
89                    DataType::Struct(fields) => {
90                        assert_eq!(fields.len(), 2);
91                        let struct_field = map_field.as_ref().clone().with_data_type(
92                            DataType::Struct(Fields::from(vec![
93                                fields[0].as_ref().clone().with_data_type(key_type),
94                                fields[1].as_ref().clone().with_data_type(value_type),
95                            ])),
96                        );
97                        DataType::Map(Arc::new(struct_field), *is_sorted)
98                    }
99                    _ => unreachable!(),
100                },
101                _ => unreachable!(),
102            };
103
104            Ok(Some(Box::new(MapArrayReader::new(
105                key_reader,
106                value_reader,
107                data_type,
108                field.def_level,
109                field.rep_level,
110                field.nullable,
111            ))))
112        }
113        (None, None) => Ok(None),
114        _ => Err(general_err!(
115            "partial projection of MapArray is not supported"
116        )),
117    }
118}
119
120/// Build array reader for list type.
121fn build_list_reader(
122    field: &ParquetField,
123    mask: &ProjectionMask,
124    is_large: bool,
125    row_groups: &dyn RowGroups,
126) -> Result<Option<Box<dyn ArrayReader>>> {
127    let children = field.children().unwrap();
128    assert_eq!(children.len(), 1);
129
130    let reader = match build_reader(&children[0], mask, row_groups)? {
131        Some(item_reader) => {
132            // Need to retrieve underlying data type to handle projection
133            let item_type = item_reader.get_data_type().clone();
134            let data_type = match &field.arrow_type {
135                DataType::List(f) => {
136                    DataType::List(Arc::new(f.as_ref().clone().with_data_type(item_type)))
137                }
138                DataType::LargeList(f) => {
139                    DataType::LargeList(Arc::new(f.as_ref().clone().with_data_type(item_type)))
140                }
141                _ => unreachable!(),
142            };
143
144            let reader = match is_large {
145                false => Box::new(ListArrayReader::<i32>::new(
146                    item_reader,
147                    data_type,
148                    field.def_level,
149                    field.rep_level,
150                    field.nullable,
151                )) as _,
152                true => Box::new(ListArrayReader::<i64>::new(
153                    item_reader,
154                    data_type,
155                    field.def_level,
156                    field.rep_level,
157                    field.nullable,
158                )) as _,
159            };
160            Some(reader)
161        }
162        None => None,
163    };
164    Ok(reader)
165}
166
167/// Build array reader for fixed-size list type.
168fn build_fixed_size_list_reader(
169    field: &ParquetField,
170    mask: &ProjectionMask,
171    row_groups: &dyn RowGroups,
172) -> Result<Option<Box<dyn ArrayReader>>> {
173    let children = field.children().unwrap();
174    assert_eq!(children.len(), 1);
175
176    let reader = match build_reader(&children[0], mask, row_groups)? {
177        Some(item_reader) => {
178            let item_type = item_reader.get_data_type().clone();
179            let reader = match &field.arrow_type {
180                &DataType::FixedSizeList(ref f, size) => {
181                    let data_type = DataType::FixedSizeList(
182                        Arc::new(f.as_ref().clone().with_data_type(item_type)),
183                        size,
184                    );
185
186                    Box::new(FixedSizeListArrayReader::new(
187                        item_reader,
188                        size as usize,
189                        data_type,
190                        field.def_level,
191                        field.rep_level,
192                        field.nullable,
193                    )) as _
194                }
195                _ => unimplemented!(),
196            };
197            Some(reader)
198        }
199        None => None,
200    };
201    Ok(reader)
202}
203
204/// Creates primitive array reader for each primitive type.
205fn build_primitive_reader(
206    field: &ParquetField,
207    mask: &ProjectionMask,
208    row_groups: &dyn RowGroups,
209) -> Result<Option<Box<dyn ArrayReader>>> {
210    let (col_idx, primitive_type) = match &field.field_type {
211        ParquetFieldType::Primitive {
212            col_idx,
213            primitive_type,
214        } => match primitive_type.as_ref() {
215            Type::PrimitiveType { .. } => (*col_idx, primitive_type.clone()),
216            Type::GroupType { .. } => unreachable!(),
217        },
218        _ => unreachable!(),
219    };
220
221    if !mask.leaf_included(col_idx) {
222        return Ok(None);
223    }
224
225    let physical_type = primitive_type.get_physical_type();
226
227    // We don't track the column path in ParquetField as it adds a potential source
228    // of bugs when the arrow mapping converts more than one level in the parquet
229    // schema into a single arrow field.
230    //
231    // None of the readers actually use this field, but it is required for this type,
232    // so just stick a placeholder in
233    let column_desc = Arc::new(ColumnDescriptor::new(
234        primitive_type,
235        field.def_level,
236        field.rep_level,
237        ColumnPath::new(vec![]),
238    ));
239
240    let page_iterator = row_groups.column_chunks(col_idx)?;
241    let arrow_type = Some(field.arrow_type.clone());
242
243    let reader = match physical_type {
244        PhysicalType::BOOLEAN => Box::new(PrimitiveArrayReader::<BoolType>::new(
245            page_iterator,
246            column_desc,
247            arrow_type,
248        )?) as _,
249        PhysicalType::INT32 => {
250            if let Some(DataType::Null) = arrow_type {
251                Box::new(NullArrayReader::<Int32Type>::new(
252                    page_iterator,
253                    column_desc,
254                )?) as _
255            } else {
256                Box::new(PrimitiveArrayReader::<Int32Type>::new(
257                    page_iterator,
258                    column_desc,
259                    arrow_type,
260                )?) as _
261            }
262        }
263        PhysicalType::INT64 => Box::new(PrimitiveArrayReader::<Int64Type>::new(
264            page_iterator,
265            column_desc,
266            arrow_type,
267        )?) as _,
268        PhysicalType::INT96 => Box::new(PrimitiveArrayReader::<Int96Type>::new(
269            page_iterator,
270            column_desc,
271            arrow_type,
272        )?) as _,
273        PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::<FloatType>::new(
274            page_iterator,
275            column_desc,
276            arrow_type,
277        )?) as _,
278        PhysicalType::DOUBLE => Box::new(PrimitiveArrayReader::<DoubleType>::new(
279            page_iterator,
280            column_desc,
281            arrow_type,
282        )?) as _,
283        PhysicalType::BYTE_ARRAY => match arrow_type {
284            Some(DataType::Dictionary(_, _)) => {
285                make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
286            }
287            Some(DataType::Utf8View | DataType::BinaryView) => {
288                make_byte_view_array_reader(page_iterator, column_desc, arrow_type)?
289            }
290            _ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?,
291        },
292        PhysicalType::FIXED_LEN_BYTE_ARRAY => {
293            make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type)?
294        }
295    };
296    Ok(Some(reader))
297}
298
299fn build_struct_reader(
300    field: &ParquetField,
301    mask: &ProjectionMask,
302    row_groups: &dyn RowGroups,
303) -> Result<Option<Box<dyn ArrayReader>>> {
304    let arrow_fields = match &field.arrow_type {
305        DataType::Struct(children) => children,
306        _ => unreachable!(),
307    };
308    let children = field.children().unwrap();
309    assert_eq!(arrow_fields.len(), children.len());
310
311    let mut readers = Vec::with_capacity(children.len());
312    let mut builder = SchemaBuilder::with_capacity(children.len());
313
314    for (arrow, parquet) in arrow_fields.iter().zip(children) {
315        if let Some(reader) = build_reader(parquet, mask, row_groups)? {
316            // Need to retrieve underlying data type to handle projection
317            let child_type = reader.get_data_type().clone();
318            builder.push(arrow.as_ref().clone().with_data_type(child_type));
319            readers.push(reader);
320        }
321    }
322
323    if readers.is_empty() {
324        return Ok(None);
325    }
326
327    Ok(Some(Box::new(StructArrayReader::new(
328        DataType::Struct(builder.finish().fields),
329        readers,
330        field.def_level,
331        field.rep_level,
332        field.nullable,
333    ))))
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339    use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
340    use crate::file::reader::{FileReader, SerializedFileReader};
341    use crate::util::test_common::file_util::get_test_file;
342    use arrow::datatypes::Field;
343    use std::sync::Arc;
344
345    #[test]
346    fn test_create_array_reader() {
347        let file = get_test_file("nulls.snappy.parquet");
348        let file_reader: Arc<dyn FileReader> = Arc::new(SerializedFileReader::new(file).unwrap());
349
350        let file_metadata = file_reader.metadata().file_metadata();
351        let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
352        let (_, fields) = parquet_to_arrow_schema_and_fields(
353            file_metadata.schema_descr(),
354            ProjectionMask::all(),
355            file_metadata.key_value_metadata(),
356        )
357        .unwrap();
358
359        let array_reader = build_array_reader(fields.as_ref(), &mask, &file_reader).unwrap();
360
361        // Create arrow types
362        let arrow_type = DataType::Struct(Fields::from(vec![Field::new(
363            "b_struct",
364            DataType::Struct(vec![Field::new("b_c_int", DataType::Int32, true)].into()),
365            true,
366        )]));
367
368        assert_eq!(array_reader.get_data_type(), &arrow_type);
369    }
370}