1use std::sync::{Arc, Mutex};
19
20use arrow_schema::{DataType, Fields, SchemaBuilder};
21
22use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
23use crate::arrow::array_reader::cached_array_reader::CacheRole;
24use crate::arrow::array_reader::cached_array_reader::CachedArrayReader;
25use crate::arrow::array_reader::empty_array::make_empty_array_reader;
26use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
27use crate::arrow::array_reader::row_group_cache::RowGroupCache;
28use crate::arrow::array_reader::{
29 make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader,
30 FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
31 PrimitiveArrayReader, RowGroups, StructArrayReader,
32};
33use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
34use crate::arrow::schema::{ParquetField, ParquetFieldType};
35use crate::arrow::ProjectionMask;
36use crate::basic::Type as PhysicalType;
37use crate::data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type};
38use crate::errors::{ParquetError, Result};
39use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
40
41#[derive(Debug, Clone)]
43pub struct CacheOptionsBuilder<'a> {
44 pub projection_mask: &'a ProjectionMask,
46 pub cache: Arc<Mutex<RowGroupCache>>,
48}
49
50impl<'a> CacheOptionsBuilder<'a> {
51 pub fn new(projection_mask: &'a ProjectionMask, cache: Arc<Mutex<RowGroupCache>>) -> Self {
53 Self {
54 projection_mask,
55 cache,
56 }
57 }
58
59 pub fn producer(self) -> CacheOptions<'a> {
61 CacheOptions {
62 projection_mask: self.projection_mask,
63 cache: self.cache,
64 role: CacheRole::Producer,
65 }
66 }
67
68 pub fn consumer(self) -> CacheOptions<'a> {
70 CacheOptions {
71 projection_mask: self.projection_mask,
72 cache: self.cache,
73 role: CacheRole::Consumer,
74 }
75 }
76}
77
78#[derive(Clone)]
80pub struct CacheOptions<'a> {
81 pub projection_mask: &'a ProjectionMask,
82 pub cache: Arc<Mutex<RowGroupCache>>,
83 pub role: CacheRole,
84}
85
86pub struct ArrayReaderBuilder<'a> {
88 row_groups: &'a dyn RowGroups,
90 cache_options: Option<&'a CacheOptions<'a>>,
92 metrics: &'a ArrowReaderMetrics,
94}
95
96impl<'a> ArrayReaderBuilder<'a> {
97 pub fn new(row_groups: &'a dyn RowGroups, metrics: &'a ArrowReaderMetrics) -> Self {
98 Self {
99 row_groups,
100 cache_options: None,
101 metrics,
102 }
103 }
104
105 pub fn with_cache_options(mut self, cache_options: Option<&'a CacheOptions<'a>>) -> Self {
107 self.cache_options = cache_options;
108 self
109 }
110
111 pub fn build_array_reader(
113 &self,
114 field: Option<&ParquetField>,
115 mask: &ProjectionMask,
116 ) -> Result<Box<dyn ArrayReader>> {
117 let reader = field
118 .and_then(|field| self.build_reader(field, mask).transpose())
119 .transpose()?
120 .unwrap_or_else(|| make_empty_array_reader(self.num_rows()));
121
122 Ok(reader)
123 }
124
125 fn num_rows(&self) -> usize {
127 self.row_groups.num_rows()
128 }
129
130 fn build_reader(
131 &self,
132 field: &ParquetField,
133 mask: &ProjectionMask,
134 ) -> Result<Option<Box<dyn ArrayReader>>> {
135 match field.field_type {
136 ParquetFieldType::Primitive { col_idx, .. } => {
137 let Some(reader) = self.build_primitive_reader(field, mask)? else {
138 return Ok(None);
139 };
140 let Some(cache_options) = self.cache_options.as_ref() else {
141 return Ok(Some(reader));
142 };
143
144 if cache_options.projection_mask.leaf_included(col_idx) {
145 Ok(Some(Box::new(CachedArrayReader::new(
146 reader,
147 Arc::clone(&cache_options.cache),
148 col_idx,
149 cache_options.role,
150 self.metrics.clone(), ))))
152 } else {
153 Ok(Some(reader))
154 }
155 }
156 ParquetFieldType::Group { .. } => match &field.arrow_type {
157 DataType::Map(_, _) => self.build_map_reader(field, mask),
158 DataType::Struct(_) => self.build_struct_reader(field, mask),
159 DataType::List(_) => self.build_list_reader(field, mask, false),
160 DataType::LargeList(_) => self.build_list_reader(field, mask, true),
161 DataType::FixedSizeList(_, _) => self.build_fixed_size_list_reader(field, mask),
162 d => unimplemented!("reading group type {} not implemented", d),
163 },
164 }
165 }
166
167 fn build_map_reader(
169 &self,
170 field: &ParquetField,
171 mask: &ProjectionMask,
172 ) -> Result<Option<Box<dyn ArrayReader>>> {
173 let children = field.children().unwrap();
174 assert_eq!(children.len(), 2);
175
176 let key_reader = self.build_reader(&children[0], mask)?;
177 let value_reader = self.build_reader(&children[1], mask)?;
178
179 match (key_reader, value_reader) {
180 (Some(key_reader), Some(value_reader)) => {
181 let key_type = key_reader.get_data_type().clone();
183 let value_type = value_reader.get_data_type().clone();
184
185 let data_type = match &field.arrow_type {
186 DataType::Map(map_field, is_sorted) => match map_field.data_type() {
187 DataType::Struct(fields) => {
188 assert_eq!(fields.len(), 2);
189 let struct_field = map_field.as_ref().clone().with_data_type(
190 DataType::Struct(Fields::from(vec![
191 fields[0].as_ref().clone().with_data_type(key_type),
192 fields[1].as_ref().clone().with_data_type(value_type),
193 ])),
194 );
195 DataType::Map(Arc::new(struct_field), *is_sorted)
196 }
197 _ => unreachable!(),
198 },
199 _ => unreachable!(),
200 };
201
202 Ok(Some(Box::new(MapArrayReader::new(
203 key_reader,
204 value_reader,
205 data_type,
206 field.def_level,
207 field.rep_level,
208 field.nullable,
209 ))))
210 }
211 (None, None) => Ok(None),
212 _ => Err(general_err!(
213 "partial projection of MapArray is not supported"
214 )),
215 }
216 }
217
218 fn build_list_reader(
220 &self,
221 field: &ParquetField,
222 mask: &ProjectionMask,
223 is_large: bool,
224 ) -> Result<Option<Box<dyn ArrayReader>>> {
225 let children = field.children().unwrap();
226 assert_eq!(children.len(), 1);
227
228 let reader = match self.build_reader(&children[0], mask)? {
229 Some(item_reader) => {
230 let item_type = item_reader.get_data_type().clone();
232 let data_type = match &field.arrow_type {
233 DataType::List(f) => {
234 DataType::List(Arc::new(f.as_ref().clone().with_data_type(item_type)))
235 }
236 DataType::LargeList(f) => {
237 DataType::LargeList(Arc::new(f.as_ref().clone().with_data_type(item_type)))
238 }
239 _ => unreachable!(),
240 };
241
242 let reader = match is_large {
243 false => Box::new(ListArrayReader::<i32>::new(
244 item_reader,
245 data_type,
246 field.def_level,
247 field.rep_level,
248 field.nullable,
249 )) as _,
250 true => Box::new(ListArrayReader::<i64>::new(
251 item_reader,
252 data_type,
253 field.def_level,
254 field.rep_level,
255 field.nullable,
256 )) as _,
257 };
258 Some(reader)
259 }
260 None => None,
261 };
262 Ok(reader)
263 }
264
265 fn build_fixed_size_list_reader(
267 &self,
268 field: &ParquetField,
269 mask: &ProjectionMask,
270 ) -> Result<Option<Box<dyn ArrayReader>>> {
271 let children = field.children().unwrap();
272 assert_eq!(children.len(), 1);
273
274 let reader = match self.build_reader(&children[0], mask)? {
275 Some(item_reader) => {
276 let item_type = item_reader.get_data_type().clone();
277 let reader = match &field.arrow_type {
278 &DataType::FixedSizeList(ref f, size) => {
279 let data_type = DataType::FixedSizeList(
280 Arc::new(f.as_ref().clone().with_data_type(item_type)),
281 size,
282 );
283
284 Box::new(FixedSizeListArrayReader::new(
285 item_reader,
286 size as usize,
287 data_type,
288 field.def_level,
289 field.rep_level,
290 field.nullable,
291 )) as _
292 }
293 _ => unimplemented!(),
294 };
295 Some(reader)
296 }
297 None => None,
298 };
299 Ok(reader)
300 }
301
302 fn build_primitive_reader(
304 &self,
305 field: &ParquetField,
306 mask: &ProjectionMask,
307 ) -> Result<Option<Box<dyn ArrayReader>>> {
308 let (col_idx, primitive_type) = match &field.field_type {
309 ParquetFieldType::Primitive {
310 col_idx,
311 primitive_type,
312 } => match primitive_type.as_ref() {
313 Type::PrimitiveType { .. } => (*col_idx, primitive_type.clone()),
314 Type::GroupType { .. } => unreachable!(),
315 },
316 _ => unreachable!(),
317 };
318
319 if !mask.leaf_included(col_idx) {
320 return Ok(None);
321 }
322
323 let physical_type = primitive_type.get_physical_type();
324
325 let column_desc = Arc::new(ColumnDescriptor::new(
332 primitive_type,
333 field.def_level,
334 field.rep_level,
335 ColumnPath::new(vec![]),
336 ));
337
338 let page_iterator = self.row_groups.column_chunks(col_idx)?;
339 let arrow_type = Some(field.arrow_type.clone());
340
341 let reader = match physical_type {
342 PhysicalType::BOOLEAN => Box::new(PrimitiveArrayReader::<BoolType>::new(
343 page_iterator,
344 column_desc,
345 arrow_type,
346 )?) as _,
347 PhysicalType::INT32 => {
348 if let Some(DataType::Null) = arrow_type {
349 Box::new(NullArrayReader::<Int32Type>::new(
350 page_iterator,
351 column_desc,
352 )?) as _
353 } else {
354 Box::new(PrimitiveArrayReader::<Int32Type>::new(
355 page_iterator,
356 column_desc,
357 arrow_type,
358 )?) as _
359 }
360 }
361 PhysicalType::INT64 => Box::new(PrimitiveArrayReader::<Int64Type>::new(
362 page_iterator,
363 column_desc,
364 arrow_type,
365 )?) as _,
366 PhysicalType::INT96 => Box::new(PrimitiveArrayReader::<Int96Type>::new(
367 page_iterator,
368 column_desc,
369 arrow_type,
370 )?) as _,
371 PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::<FloatType>::new(
372 page_iterator,
373 column_desc,
374 arrow_type,
375 )?) as _,
376 PhysicalType::DOUBLE => Box::new(PrimitiveArrayReader::<DoubleType>::new(
377 page_iterator,
378 column_desc,
379 arrow_type,
380 )?) as _,
381 PhysicalType::BYTE_ARRAY => match arrow_type {
382 Some(DataType::Dictionary(_, _)) => {
383 make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
384 }
385 Some(DataType::Utf8View | DataType::BinaryView) => {
386 make_byte_view_array_reader(page_iterator, column_desc, arrow_type)?
387 }
388 _ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?,
389 },
390 PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type {
391 Some(DataType::Dictionary(_, _)) => {
392 make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
393 }
394 _ => make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type)?,
395 },
396 };
397 Ok(Some(reader))
398 }
399
400 fn build_struct_reader(
401 &self,
402 field: &ParquetField,
403 mask: &ProjectionMask,
404 ) -> Result<Option<Box<dyn ArrayReader>>> {
405 let arrow_fields = match &field.arrow_type {
406 DataType::Struct(children) => children,
407 _ => unreachable!(),
408 };
409 let children = field.children().unwrap();
410 assert_eq!(arrow_fields.len(), children.len());
411
412 let mut readers = Vec::with_capacity(children.len());
413 let mut builder = SchemaBuilder::with_capacity(children.len());
414
415 for (arrow, parquet) in arrow_fields.iter().zip(children) {
416 if let Some(reader) = self.build_reader(parquet, mask)? {
417 let child_type = reader.get_data_type().clone();
419 builder.push(arrow.as_ref().clone().with_data_type(child_type));
420 readers.push(reader);
421 }
422 }
423
424 if readers.is_empty() {
425 return Ok(None);
426 }
427
428 Ok(Some(Box::new(StructArrayReader::new(
429 DataType::Struct(builder.finish().fields),
430 readers,
431 field.def_level,
432 field.rep_level,
433 field.nullable,
434 ))))
435 }
436}
437
438#[cfg(test)]
439mod tests {
440 use super::*;
441 use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
442 use crate::file::reader::{FileReader, SerializedFileReader};
443 use crate::util::test_common::file_util::get_test_file;
444 use arrow::datatypes::Field;
445 use std::sync::Arc;
446
447 #[test]
448 fn test_create_array_reader() {
449 let file = get_test_file("nulls.snappy.parquet");
450 let file_reader: Arc<dyn FileReader> = Arc::new(SerializedFileReader::new(file).unwrap());
451
452 let file_metadata = file_reader.metadata().file_metadata();
453 let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
454 let (_, fields) = parquet_to_arrow_schema_and_fields(
455 file_metadata.schema_descr(),
456 ProjectionMask::all(),
457 file_metadata.key_value_metadata(),
458 )
459 .unwrap();
460
461 let metrics = ArrowReaderMetrics::disabled();
462 let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
463 .build_array_reader(fields.as_ref(), &mask)
464 .unwrap();
465
466 let arrow_type = DataType::Struct(Fields::from(vec![Field::new(
468 "b_struct",
469 DataType::Struct(vec![Field::new("b_c_int", DataType::Int32, true)].into()),
470 true,
471 )]));
472
473 assert_eq!(array_reader.get_data_type(), &arrow_type);
474 }
475}