1use std::sync::Arc;
19
20use arrow_schema::{DataType, Fields, SchemaBuilder};
21
22use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
23use crate::arrow::array_reader::empty_array::make_empty_array_reader;
24use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
25use crate::arrow::array_reader::{
26 make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
27 FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
28 PrimitiveArrayReader, RowGroups, StructArrayReader,
29};
30use crate::arrow::schema::{ParquetField, ParquetFieldType};
31use crate::arrow::ProjectionMask;
32use crate::basic::Type as PhysicalType;
33use crate::data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type};
34use crate::errors::{ParquetError, Result};
35use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
36
37pub fn build_array_reader(
39 field: Option<&ParquetField>,
40 mask: &ProjectionMask,
41 row_groups: &dyn RowGroups,
42) -> Result<Box<dyn ArrayReader>> {
43 let reader = field
44 .and_then(|field| build_reader(field, mask, row_groups).transpose())
45 .transpose()?
46 .unwrap_or_else(|| make_empty_array_reader(row_groups.num_rows()));
47
48 Ok(reader)
49}
50
51fn build_reader(
52 field: &ParquetField,
53 mask: &ProjectionMask,
54 row_groups: &dyn RowGroups,
55) -> Result<Option<Box<dyn ArrayReader>>> {
56 match field.field_type {
57 ParquetFieldType::Primitive { .. } => build_primitive_reader(field, mask, row_groups),
58 ParquetFieldType::Group { .. } => match &field.arrow_type {
59 DataType::Map(_, _) => build_map_reader(field, mask, row_groups),
60 DataType::Struct(_) => build_struct_reader(field, mask, row_groups),
61 DataType::List(_) => build_list_reader(field, mask, false, row_groups),
62 DataType::LargeList(_) => build_list_reader(field, mask, true, row_groups),
63 DataType::FixedSizeList(_, _) => build_fixed_size_list_reader(field, mask, row_groups),
64 d => unimplemented!("reading group type {} not implemented", d),
65 },
66 }
67}
68
69fn build_map_reader(
71 field: &ParquetField,
72 mask: &ProjectionMask,
73 row_groups: &dyn RowGroups,
74) -> Result<Option<Box<dyn ArrayReader>>> {
75 let children = field.children().unwrap();
76 assert_eq!(children.len(), 2);
77
78 let key_reader = build_reader(&children[0], mask, row_groups)?;
79 let value_reader = build_reader(&children[1], mask, row_groups)?;
80
81 match (key_reader, value_reader) {
82 (Some(key_reader), Some(value_reader)) => {
83 let key_type = key_reader.get_data_type().clone();
85 let value_type = value_reader.get_data_type().clone();
86
87 let data_type = match &field.arrow_type {
88 DataType::Map(map_field, is_sorted) => match map_field.data_type() {
89 DataType::Struct(fields) => {
90 assert_eq!(fields.len(), 2);
91 let struct_field = map_field.as_ref().clone().with_data_type(
92 DataType::Struct(Fields::from(vec![
93 fields[0].as_ref().clone().with_data_type(key_type),
94 fields[1].as_ref().clone().with_data_type(value_type),
95 ])),
96 );
97 DataType::Map(Arc::new(struct_field), *is_sorted)
98 }
99 _ => unreachable!(),
100 },
101 _ => unreachable!(),
102 };
103
104 Ok(Some(Box::new(MapArrayReader::new(
105 key_reader,
106 value_reader,
107 data_type,
108 field.def_level,
109 field.rep_level,
110 field.nullable,
111 ))))
112 }
113 (None, None) => Ok(None),
114 _ => Err(general_err!(
115 "partial projection of MapArray is not supported"
116 )),
117 }
118}
119
120fn build_list_reader(
122 field: &ParquetField,
123 mask: &ProjectionMask,
124 is_large: bool,
125 row_groups: &dyn RowGroups,
126) -> Result<Option<Box<dyn ArrayReader>>> {
127 let children = field.children().unwrap();
128 assert_eq!(children.len(), 1);
129
130 let reader = match build_reader(&children[0], mask, row_groups)? {
131 Some(item_reader) => {
132 let item_type = item_reader.get_data_type().clone();
134 let data_type = match &field.arrow_type {
135 DataType::List(f) => {
136 DataType::List(Arc::new(f.as_ref().clone().with_data_type(item_type)))
137 }
138 DataType::LargeList(f) => {
139 DataType::LargeList(Arc::new(f.as_ref().clone().with_data_type(item_type)))
140 }
141 _ => unreachable!(),
142 };
143
144 let reader = match is_large {
145 false => Box::new(ListArrayReader::<i32>::new(
146 item_reader,
147 data_type,
148 field.def_level,
149 field.rep_level,
150 field.nullable,
151 )) as _,
152 true => Box::new(ListArrayReader::<i64>::new(
153 item_reader,
154 data_type,
155 field.def_level,
156 field.rep_level,
157 field.nullable,
158 )) as _,
159 };
160 Some(reader)
161 }
162 None => None,
163 };
164 Ok(reader)
165}
166
167fn build_fixed_size_list_reader(
169 field: &ParquetField,
170 mask: &ProjectionMask,
171 row_groups: &dyn RowGroups,
172) -> Result<Option<Box<dyn ArrayReader>>> {
173 let children = field.children().unwrap();
174 assert_eq!(children.len(), 1);
175
176 let reader = match build_reader(&children[0], mask, row_groups)? {
177 Some(item_reader) => {
178 let item_type = item_reader.get_data_type().clone();
179 let reader = match &field.arrow_type {
180 &DataType::FixedSizeList(ref f, size) => {
181 let data_type = DataType::FixedSizeList(
182 Arc::new(f.as_ref().clone().with_data_type(item_type)),
183 size,
184 );
185
186 Box::new(FixedSizeListArrayReader::new(
187 item_reader,
188 size as usize,
189 data_type,
190 field.def_level,
191 field.rep_level,
192 field.nullable,
193 )) as _
194 }
195 _ => unimplemented!(),
196 };
197 Some(reader)
198 }
199 None => None,
200 };
201 Ok(reader)
202}
203
204fn build_primitive_reader(
206 field: &ParquetField,
207 mask: &ProjectionMask,
208 row_groups: &dyn RowGroups,
209) -> Result<Option<Box<dyn ArrayReader>>> {
210 let (col_idx, primitive_type) = match &field.field_type {
211 ParquetFieldType::Primitive {
212 col_idx,
213 primitive_type,
214 } => match primitive_type.as_ref() {
215 Type::PrimitiveType { .. } => (*col_idx, primitive_type.clone()),
216 Type::GroupType { .. } => unreachable!(),
217 },
218 _ => unreachable!(),
219 };
220
221 if !mask.leaf_included(col_idx) {
222 return Ok(None);
223 }
224
225 let physical_type = primitive_type.get_physical_type();
226
227 let column_desc = Arc::new(ColumnDescriptor::new(
234 primitive_type,
235 field.def_level,
236 field.rep_level,
237 ColumnPath::new(vec![]),
238 ));
239
240 let page_iterator = row_groups.column_chunks(col_idx)?;
241 let arrow_type = Some(field.arrow_type.clone());
242
243 let reader = match physical_type {
244 PhysicalType::BOOLEAN => Box::new(PrimitiveArrayReader::<BoolType>::new(
245 page_iterator,
246 column_desc,
247 arrow_type,
248 )?) as _,
249 PhysicalType::INT32 => {
250 if let Some(DataType::Null) = arrow_type {
251 Box::new(NullArrayReader::<Int32Type>::new(
252 page_iterator,
253 column_desc,
254 )?) as _
255 } else {
256 Box::new(PrimitiveArrayReader::<Int32Type>::new(
257 page_iterator,
258 column_desc,
259 arrow_type,
260 )?) as _
261 }
262 }
263 PhysicalType::INT64 => Box::new(PrimitiveArrayReader::<Int64Type>::new(
264 page_iterator,
265 column_desc,
266 arrow_type,
267 )?) as _,
268 PhysicalType::INT96 => Box::new(PrimitiveArrayReader::<Int96Type>::new(
269 page_iterator,
270 column_desc,
271 arrow_type,
272 )?) as _,
273 PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::<FloatType>::new(
274 page_iterator,
275 column_desc,
276 arrow_type,
277 )?) as _,
278 PhysicalType::DOUBLE => Box::new(PrimitiveArrayReader::<DoubleType>::new(
279 page_iterator,
280 column_desc,
281 arrow_type,
282 )?) as _,
283 PhysicalType::BYTE_ARRAY => match arrow_type {
284 Some(DataType::Dictionary(_, _)) => {
285 make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
286 }
287 Some(DataType::Utf8View | DataType::BinaryView) => {
288 make_byte_view_array_reader(page_iterator, column_desc, arrow_type)?
289 }
290 _ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?,
291 },
292 PhysicalType::FIXED_LEN_BYTE_ARRAY => {
293 make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type)?
294 }
295 };
296 Ok(Some(reader))
297}
298
299fn build_struct_reader(
300 field: &ParquetField,
301 mask: &ProjectionMask,
302 row_groups: &dyn RowGroups,
303) -> Result<Option<Box<dyn ArrayReader>>> {
304 let arrow_fields = match &field.arrow_type {
305 DataType::Struct(children) => children,
306 _ => unreachable!(),
307 };
308 let children = field.children().unwrap();
309 assert_eq!(arrow_fields.len(), children.len());
310
311 let mut readers = Vec::with_capacity(children.len());
312 let mut builder = SchemaBuilder::with_capacity(children.len());
313
314 for (arrow, parquet) in arrow_fields.iter().zip(children) {
315 if let Some(reader) = build_reader(parquet, mask, row_groups)? {
316 let child_type = reader.get_data_type().clone();
318 builder.push(arrow.as_ref().clone().with_data_type(child_type));
319 readers.push(reader);
320 }
321 }
322
323 if readers.is_empty() {
324 return Ok(None);
325 }
326
327 Ok(Some(Box::new(StructArrayReader::new(
328 DataType::Struct(builder.finish().fields),
329 readers,
330 field.def_level,
331 field.rep_level,
332 field.nullable,
333 ))))
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339 use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
340 use crate::file::reader::{FileReader, SerializedFileReader};
341 use crate::util::test_common::file_util::get_test_file;
342 use arrow::datatypes::Field;
343 use std::sync::Arc;
344
345 #[test]
346 fn test_create_array_reader() {
347 let file = get_test_file("nulls.snappy.parquet");
348 let file_reader: Arc<dyn FileReader> = Arc::new(SerializedFileReader::new(file).unwrap());
349
350 let file_metadata = file_reader.metadata().file_metadata();
351 let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
352 let (_, fields) = parquet_to_arrow_schema_and_fields(
353 file_metadata.schema_descr(),
354 ProjectionMask::all(),
355 file_metadata.key_value_metadata(),
356 )
357 .unwrap();
358
359 let array_reader = build_array_reader(fields.as_ref(), &mask, &file_reader).unwrap();
360
361 let arrow_type = DataType::Struct(Fields::from(vec![Field::new(
363 "b_struct",
364 DataType::Struct(vec![Field::new("b_c_int", DataType::Int32, true)].into()),
365 true,
366 )]));
367
368 assert_eq!(array_reader.get_data_type(), &arrow_type);
369 }
370}