1use std::sync::{Arc, RwLock};
19
20use arrow_schema::{DataType, Fields, SchemaBuilder};
21
22use crate::arrow::ProjectionMask;
23use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
24use crate::arrow::array_reader::cached_array_reader::CacheRole;
25use crate::arrow::array_reader::cached_array_reader::CachedArrayReader;
26use crate::arrow::array_reader::empty_array::make_empty_array_reader;
27use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
28use crate::arrow::array_reader::row_group_cache::RowGroupCache;
29use crate::arrow::array_reader::row_group_index::RowGroupIndexReader;
30use crate::arrow::array_reader::row_number::RowNumberReader;
31use crate::arrow::array_reader::{
32 ArrayReader, FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
33 PrimitiveArrayReader, RowGroups, StructArrayReader, make_byte_array_dictionary_reader,
34 make_byte_array_reader,
35};
36use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
37use crate::arrow::schema::{ParquetField, ParquetFieldType, VirtualColumnType};
38use crate::basic::Type as PhysicalType;
39use crate::data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type, Int96Type};
40use crate::errors::{ParquetError, Result};
41use crate::file::metadata::ParquetMetaData;
42use crate::schema::types::{ColumnDescriptor, ColumnPath, Type};
43
44#[derive(Debug, Clone)]
46pub struct CacheOptionsBuilder<'a> {
47 pub projection_mask: &'a ProjectionMask,
49 pub cache: &'a Arc<RwLock<RowGroupCache>>,
51}
52
53impl<'a> CacheOptionsBuilder<'a> {
54 pub fn new(projection_mask: &'a ProjectionMask, cache: &'a Arc<RwLock<RowGroupCache>>) -> Self {
56 Self {
57 projection_mask,
58 cache,
59 }
60 }
61
62 pub fn producer(self) -> CacheOptions<'a> {
64 CacheOptions {
65 projection_mask: self.projection_mask,
66 cache: self.cache,
67 role: CacheRole::Producer,
68 }
69 }
70
71 pub fn consumer(self) -> CacheOptions<'a> {
73 CacheOptions {
74 projection_mask: self.projection_mask,
75 cache: self.cache,
76 role: CacheRole::Consumer,
77 }
78 }
79}
80
81#[derive(Clone)]
83pub struct CacheOptions<'a> {
84 pub projection_mask: &'a ProjectionMask,
85 pub cache: &'a Arc<RwLock<RowGroupCache>>,
86 pub role: CacheRole,
87}
88
89pub struct ArrayReaderBuilder<'a> {
91 row_groups: &'a dyn RowGroups,
93 cache_options: Option<&'a CacheOptions<'a>>,
95 parquet_metadata: Option<&'a ParquetMetaData>,
97 metrics: &'a ArrowReaderMetrics,
99}
100
101impl<'a> ArrayReaderBuilder<'a> {
102 pub fn new(row_groups: &'a dyn RowGroups, metrics: &'a ArrowReaderMetrics) -> Self {
103 Self {
104 row_groups,
105 cache_options: None,
106 parquet_metadata: None,
107 metrics,
108 }
109 }
110
111 pub fn with_cache_options(mut self, cache_options: Option<&'a CacheOptions<'a>>) -> Self {
113 self.cache_options = cache_options;
114 self
115 }
116
117 pub fn with_parquet_metadata(mut self, parquet_metadata: &'a ParquetMetaData) -> Self {
119 self.parquet_metadata = Some(parquet_metadata);
120 self
121 }
122
123 pub fn build_array_reader(
125 &self,
126 field: Option<&ParquetField>,
127 mask: &ProjectionMask,
128 ) -> Result<Box<dyn ArrayReader>> {
129 let reader = field
130 .and_then(|field| self.build_reader(field, mask).transpose())
131 .transpose()?
132 .unwrap_or_else(|| make_empty_array_reader(self.num_rows()));
133
134 Ok(reader)
135 }
136
137 fn num_rows(&self) -> usize {
139 self.row_groups.num_rows()
140 }
141
142 fn build_reader(
143 &self,
144 field: &ParquetField,
145 mask: &ProjectionMask,
146 ) -> Result<Option<Box<dyn ArrayReader>>> {
147 match field.field_type {
148 ParquetFieldType::Primitive { col_idx, .. } => {
149 let Some(reader) = self.build_primitive_reader(field, mask)? else {
150 return Ok(None);
151 };
152 let Some(cache_options) = self.cache_options.as_ref() else {
153 return Ok(Some(reader));
154 };
155
156 if cache_options.projection_mask.leaf_included(col_idx) {
157 Ok(Some(Box::new(CachedArrayReader::new(
158 reader,
159 Arc::clone(cache_options.cache),
160 col_idx,
161 cache_options.role,
162 self.metrics.clone(), ))))
164 } else {
165 Ok(Some(reader))
166 }
167 }
168 ParquetFieldType::Virtual(virtual_type) => {
169 match virtual_type {
172 VirtualColumnType::RowNumber => Ok(Some(self.build_row_number_reader()?)),
173 VirtualColumnType::RowGroupIndex => {
174 Ok(Some(self.build_row_group_index_reader()?))
175 }
176 }
177 }
178 ParquetFieldType::Group { .. } => match &field.arrow_type {
179 DataType::Map(_, _) => self.build_map_reader(field, mask),
180 DataType::Struct(_) => self.build_struct_reader(field, mask),
181 DataType::List(_) => self.build_list_reader(field, mask, false),
182 DataType::LargeList(_) => self.build_list_reader(field, mask, true),
183 DataType::FixedSizeList(_, _) => self.build_fixed_size_list_reader(field, mask),
184 d => unimplemented!("reading group type {} not implemented", d),
185 },
186 }
187 }
188
189 fn build_row_number_reader(&self) -> Result<Box<dyn ArrayReader>> {
190 let parquet_metadata = self.parquet_metadata.ok_or_else(|| {
191 ParquetError::General(
192 "ParquetMetaData is required to read virtual row number columns.".to_string(),
193 )
194 })?;
195 Ok(Box::new(RowNumberReader::try_new(
196 parquet_metadata,
197 self.row_groups.row_groups(),
198 )?))
199 }
200
201 fn build_row_group_index_reader(&self) -> Result<Box<dyn ArrayReader>> {
202 let parquet_metadata = self.parquet_metadata.ok_or_else(|| {
203 ParquetError::General(
204 "ParquetMetaData is required to read virtual row group index columns.".to_string(),
205 )
206 })?;
207 Ok(Box::new(RowGroupIndexReader::try_new(
208 parquet_metadata,
209 self.row_groups.row_groups(),
210 )?))
211 }
212
213 fn build_map_reader(
215 &self,
216 field: &ParquetField,
217 mask: &ProjectionMask,
218 ) -> Result<Option<Box<dyn ArrayReader>>> {
219 let children = field.children().unwrap();
220 assert_eq!(children.len(), 2);
221
222 let key_reader = self.build_reader(&children[0], mask)?;
223 let value_reader = self.build_reader(&children[1], mask)?;
224
225 match (key_reader, value_reader) {
226 (Some(key_reader), Some(value_reader)) => {
227 let key_type = key_reader.get_data_type().clone();
229 let value_type = value_reader.get_data_type().clone();
230
231 let data_type = match &field.arrow_type {
232 DataType::Map(map_field, is_sorted) => match map_field.data_type() {
233 DataType::Struct(fields) => {
234 assert_eq!(fields.len(), 2);
235 let struct_field = map_field.as_ref().clone().with_data_type(
236 DataType::Struct(Fields::from(vec![
237 fields[0].as_ref().clone().with_data_type(key_type),
238 fields[1].as_ref().clone().with_data_type(value_type),
239 ])),
240 );
241 DataType::Map(Arc::new(struct_field), *is_sorted)
242 }
243 _ => unreachable!(),
244 },
245 _ => unreachable!(),
246 };
247
248 Ok(Some(Box::new(MapArrayReader::new(
249 key_reader,
250 value_reader,
251 data_type,
252 field.def_level,
253 field.rep_level,
254 field.nullable,
255 ))))
256 }
257 (None, None) => Ok(None),
258 _ => Err(general_err!(
259 "partial projection of MapArray is not supported"
260 )),
261 }
262 }
263
264 fn build_list_reader(
266 &self,
267 field: &ParquetField,
268 mask: &ProjectionMask,
269 is_large: bool,
270 ) -> Result<Option<Box<dyn ArrayReader>>> {
271 let children = field.children().unwrap();
272 assert_eq!(children.len(), 1);
273
274 let reader = match self.build_reader(&children[0], mask)? {
275 Some(item_reader) => {
276 let item_type = item_reader.get_data_type().clone();
278 let data_type = match &field.arrow_type {
279 DataType::List(f) => {
280 DataType::List(Arc::new(f.as_ref().clone().with_data_type(item_type)))
281 }
282 DataType::LargeList(f) => {
283 DataType::LargeList(Arc::new(f.as_ref().clone().with_data_type(item_type)))
284 }
285 _ => unreachable!(),
286 };
287
288 let reader = match is_large {
289 false => Box::new(ListArrayReader::<i32>::new(
290 item_reader,
291 data_type,
292 field.def_level,
293 field.rep_level,
294 field.nullable,
295 )) as _,
296 true => Box::new(ListArrayReader::<i64>::new(
297 item_reader,
298 data_type,
299 field.def_level,
300 field.rep_level,
301 field.nullable,
302 )) as _,
303 };
304 Some(reader)
305 }
306 None => None,
307 };
308 Ok(reader)
309 }
310
311 fn build_fixed_size_list_reader(
313 &self,
314 field: &ParquetField,
315 mask: &ProjectionMask,
316 ) -> Result<Option<Box<dyn ArrayReader>>> {
317 let children = field.children().unwrap();
318 assert_eq!(children.len(), 1);
319
320 let reader = match self.build_reader(&children[0], mask)? {
321 Some(item_reader) => {
322 let item_type = item_reader.get_data_type().clone();
323 let reader = match &field.arrow_type {
324 &DataType::FixedSizeList(ref f, size) => {
325 let data_type = DataType::FixedSizeList(
326 Arc::new(f.as_ref().clone().with_data_type(item_type)),
327 size,
328 );
329
330 Box::new(FixedSizeListArrayReader::new(
331 item_reader,
332 size as usize,
333 data_type,
334 field.def_level,
335 field.rep_level,
336 field.nullable,
337 )) as _
338 }
339 _ => unimplemented!(),
340 };
341 Some(reader)
342 }
343 None => None,
344 };
345 Ok(reader)
346 }
347
348 fn build_primitive_reader(
350 &self,
351 field: &ParquetField,
352 mask: &ProjectionMask,
353 ) -> Result<Option<Box<dyn ArrayReader>>> {
354 let (col_idx, primitive_type) = match &field.field_type {
355 ParquetFieldType::Primitive {
356 col_idx,
357 primitive_type,
358 } => match primitive_type.as_ref() {
359 Type::PrimitiveType { .. } => (*col_idx, primitive_type.clone()),
360 Type::GroupType { .. } => unreachable!(),
361 },
362 _ => unreachable!(),
363 };
364
365 if !mask.leaf_included(col_idx) {
366 return Ok(None);
367 }
368
369 let physical_type = primitive_type.get_physical_type();
370
371 let column_desc = Arc::new(ColumnDescriptor::new(
378 primitive_type,
379 field.def_level,
380 field.rep_level,
381 ColumnPath::new(vec![]),
382 ));
383
384 let page_iterator = self.row_groups.column_chunks(col_idx)?;
385 let arrow_type = Some(field.arrow_type.clone());
386
387 let reader = match physical_type {
388 PhysicalType::BOOLEAN => Box::new(PrimitiveArrayReader::<BoolType>::new(
389 page_iterator,
390 column_desc,
391 arrow_type,
392 )?) as _,
393 PhysicalType::INT32 => {
394 if let Some(DataType::Null) = arrow_type {
395 Box::new(NullArrayReader::<Int32Type>::new(
396 page_iterator,
397 column_desc,
398 )?) as _
399 } else {
400 Box::new(PrimitiveArrayReader::<Int32Type>::new(
401 page_iterator,
402 column_desc,
403 arrow_type,
404 )?) as _
405 }
406 }
407 PhysicalType::INT64 => Box::new(PrimitiveArrayReader::<Int64Type>::new(
408 page_iterator,
409 column_desc,
410 arrow_type,
411 )?) as _,
412 PhysicalType::INT96 => Box::new(PrimitiveArrayReader::<Int96Type>::new(
413 page_iterator,
414 column_desc,
415 arrow_type,
416 )?) as _,
417 PhysicalType::FLOAT => Box::new(PrimitiveArrayReader::<FloatType>::new(
418 page_iterator,
419 column_desc,
420 arrow_type,
421 )?) as _,
422 PhysicalType::DOUBLE => Box::new(PrimitiveArrayReader::<DoubleType>::new(
423 page_iterator,
424 column_desc,
425 arrow_type,
426 )?) as _,
427 PhysicalType::BYTE_ARRAY => match arrow_type {
428 Some(DataType::Dictionary(_, _)) => {
429 make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
430 }
431 Some(DataType::Utf8View | DataType::BinaryView) => {
432 make_byte_view_array_reader(page_iterator, column_desc, arrow_type)?
433 }
434 _ => make_byte_array_reader(page_iterator, column_desc, arrow_type)?,
435 },
436 PhysicalType::FIXED_LEN_BYTE_ARRAY => match arrow_type {
437 Some(DataType::Dictionary(_, _)) => {
438 make_byte_array_dictionary_reader(page_iterator, column_desc, arrow_type)?
439 }
440 _ => make_fixed_len_byte_array_reader(page_iterator, column_desc, arrow_type)?,
441 },
442 };
443 Ok(Some(reader))
444 }
445
446 fn build_struct_reader(
447 &self,
448 field: &ParquetField,
449 mask: &ProjectionMask,
450 ) -> Result<Option<Box<dyn ArrayReader>>> {
451 let arrow_fields = match &field.arrow_type {
452 DataType::Struct(children) => children,
453 _ => unreachable!(),
454 };
455 let children = field.children().unwrap();
456 assert_eq!(arrow_fields.len(), children.len());
457
458 let mut readers = Vec::with_capacity(children.len());
459 let mut builder = SchemaBuilder::with_capacity(children.len());
460
461 for (arrow, parquet) in arrow_fields.iter().zip(children) {
462 if let Some(reader) = self.build_reader(parquet, mask)? {
463 let child_type = reader.get_data_type().clone();
465 builder.push(arrow.as_ref().clone().with_data_type(child_type));
466 readers.push(reader);
467 }
468 }
469
470 if readers.is_empty() {
471 return Ok(None);
472 }
473
474 Ok(Some(Box::new(StructArrayReader::new(
475 DataType::Struct(builder.finish().fields),
476 readers,
477 field.def_level,
478 field.rep_level,
479 field.nullable,
480 ))))
481 }
482}
483
484#[cfg(test)]
485mod tests {
486 use super::*;
487 use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
488 use crate::arrow::schema::virtual_type::RowNumber;
489 use crate::file::reader::{FileReader, SerializedFileReader};
490 use crate::util::test_common::file_util::get_test_file;
491 use arrow::datatypes::Field;
492 use std::sync::Arc;
493
494 #[test]
495 fn test_create_array_reader() {
496 let file = get_test_file("nulls.snappy.parquet");
497 let file_reader: Arc<dyn FileReader> = Arc::new(SerializedFileReader::new(file).unwrap());
498
499 let file_metadata = file_reader.metadata().file_metadata();
500 let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
501 let (_, fields) = parquet_to_arrow_schema_and_fields(
502 file_metadata.schema_descr(),
503 ProjectionMask::all(),
504 file_metadata.key_value_metadata(),
505 &[],
506 )
507 .unwrap();
508
509 let metrics = ArrowReaderMetrics::disabled();
510 let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
511 .build_array_reader(fields.as_ref(), &mask)
512 .unwrap();
513
514 let arrow_type = DataType::Struct(Fields::from(vec![Field::new(
516 "b_struct",
517 DataType::Struct(vec![Field::new("b_c_int", DataType::Int32, true)].into()),
518 true,
519 )]));
520
521 assert_eq!(array_reader.get_data_type(), &arrow_type);
522 }
523
524 #[test]
525 fn test_create_array_reader_with_row_numbers() {
526 let file = get_test_file("nulls.snappy.parquet");
527 let file_reader: Arc<dyn FileReader> = Arc::new(SerializedFileReader::new(file).unwrap());
528
529 let file_metadata = file_reader.metadata().file_metadata();
530 let mask = ProjectionMask::leaves(file_metadata.schema_descr(), [0]);
531 let row_number_field = Arc::new(
532 Field::new("row_number", DataType::Int64, false).with_extension_type(RowNumber),
533 );
534 let (_, fields) = parquet_to_arrow_schema_and_fields(
535 file_metadata.schema_descr(),
536 ProjectionMask::all(),
537 file_metadata.key_value_metadata(),
538 std::slice::from_ref(&row_number_field),
539 )
540 .unwrap();
541
542 let metrics = ArrowReaderMetrics::disabled();
543 let array_reader = ArrayReaderBuilder::new(&file_reader, &metrics)
544 .with_parquet_metadata(file_reader.metadata())
545 .build_array_reader(fields.as_ref(), &mask)
546 .unwrap();
547
548 let arrow_type = DataType::Struct(Fields::from(vec![
550 Field::new(
551 "b_struct",
552 DataType::Struct(vec![Field::new("b_c_int", DataType::Int32, true)].into()),
553 true,
554 ),
555 (*row_number_field).clone(),
556 ]));
557
558 assert_eq!(array_reader.get_data_type(), &arrow_type);
559 }
560}