1use crate::arrow::array_reader::ArrayReader;
19use crate::errors::ParquetError;
20use crate::errors::Result;
21use arrow_array::{
22 builder::BooleanBufferBuilder, new_empty_array, Array, ArrayRef, GenericListArray,
23 OffsetSizeTrait,
24};
25use arrow_buffer::Buffer;
26use arrow_buffer::ToByteSlice;
27use arrow_data::{transform::MutableArrayData, ArrayData};
28use arrow_schema::DataType as ArrowType;
29use std::any::Any;
30use std::cmp::Ordering;
31use std::marker::PhantomData;
32use std::sync::Arc;
33
34pub struct ListArrayReader<OffsetSize: OffsetSizeTrait> {
36 item_reader: Box<dyn ArrayReader>,
37 data_type: ArrowType,
38 def_level: i16,
40 rep_level: i16,
42 nullable: bool,
44 _marker: PhantomData<OffsetSize>,
45}
46
47impl<OffsetSize: OffsetSizeTrait> ListArrayReader<OffsetSize> {
48 pub fn new(
50 item_reader: Box<dyn ArrayReader>,
51 data_type: ArrowType,
52 def_level: i16,
53 rep_level: i16,
54 nullable: bool,
55 ) -> Self {
56 Self {
57 item_reader,
58 data_type,
59 def_level,
60 rep_level,
61 nullable,
62 _marker: PhantomData,
63 }
64 }
65}
66
67impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
69 fn as_any(&self) -> &dyn Any {
70 self
71 }
72
73 fn get_data_type(&self) -> &ArrowType {
76 &self.data_type
77 }
78
79 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
80 let size = self.item_reader.read_records(batch_size)?;
81 Ok(size)
82 }
83
84 fn consume_batch(&mut self) -> Result<ArrayRef> {
85 let next_batch_array = self.item_reader.consume_batch()?;
86 if next_batch_array.len() == 0 {
87 return Ok(new_empty_array(&self.data_type));
88 }
89
90 let def_levels = self
91 .item_reader
92 .get_def_levels()
93 .ok_or_else(|| general_err!("item_reader def levels are None."))?;
94
95 let rep_levels = self
96 .item_reader
97 .get_rep_levels()
98 .ok_or_else(|| general_err!("item_reader rep levels are None."))?;
99
100 if OffsetSize::from_usize(next_batch_array.len()).is_none() {
101 return Err(general_err!(
102 "offset of {} would overflow list array",
103 next_batch_array.len()
104 ));
105 }
106
107 if !rep_levels.is_empty() && rep_levels[0] != 0 {
108 return Err(general_err!("first repetition level of batch must be 0"));
111 }
112
113 let mut list_offsets: Vec<OffsetSize> = Vec::with_capacity(next_batch_array.len() + 1);
129
130 let mut validity = self
132 .nullable
133 .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
134
135 let mut cur_offset = 0;
137
138 let mut filter_start = None;
140
141 let mut skipped = 0;
143
144 let data = next_batch_array.to_data();
146 let mut child_data_builder =
147 MutableArrayData::new(vec![&data], false, next_batch_array.len());
148
149 def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
150 match r.cmp(&self.rep_level) {
151 Ordering::Greater => {
152 if *d < self.def_level {
154 return Err(general_err!(
155 "Encountered repetition level too large for definition level"
156 ));
157 }
158 }
159 Ordering::Equal => {
160 cur_offset += 1;
162 }
163 Ordering::Less => {
164 list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap());
167
168 if *d >= self.def_level {
169 filter_start.get_or_insert(cur_offset + skipped);
173
174 cur_offset += 1;
175
176 if let Some(validity) = validity.as_mut() {
177 validity.append(true)
178 }
179 } else {
180 if let Some(start) = filter_start.take() {
182 child_data_builder.extend(0, start, cur_offset + skipped);
183 }
184
185 if let Some(validity) = validity.as_mut() {
186 validity.append(*d + 1 == self.def_level)
188 }
189
190 skipped += 1;
191 }
192 }
193 }
194 Ok(())
195 })?;
196
197 list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap());
198
199 let child_data = if skipped == 0 {
200 next_batch_array.to_data()
202 } else {
203 if let Some(start) = filter_start.take() {
205 child_data_builder.extend(0, start, cur_offset + skipped)
206 }
207
208 child_data_builder.freeze()
209 };
210
211 if cur_offset != child_data.len() {
212 return Err(general_err!("Failed to reconstruct list from level data"));
213 }
214
215 let value_offsets = Buffer::from(list_offsets.to_byte_slice());
216
217 let mut data_builder = ArrayData::builder(self.get_data_type().clone())
218 .len(list_offsets.len() - 1)
219 .add_buffer(value_offsets)
220 .add_child_data(child_data);
221
222 if let Some(builder) = validity {
223 assert_eq!(builder.len(), list_offsets.len() - 1);
224 data_builder = data_builder.null_bit_buffer(Some(builder.into()))
225 }
226
227 let list_data = unsafe { data_builder.build_unchecked() };
228
229 let result_array = GenericListArray::<OffsetSize>::from(list_data);
230 Ok(Arc::new(result_array))
231 }
232
233 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
234 self.item_reader.skip_records(num_records)
235 }
236
237 fn get_def_levels(&self) -> Option<&[i16]> {
238 self.item_reader.get_def_levels()
239 }
240
241 fn get_rep_levels(&self) -> Option<&[i16]> {
242 self.item_reader.get_rep_levels()
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249 use crate::arrow::array_reader::build_array_reader;
250 use crate::arrow::array_reader::list_array::ListArrayReader;
251 use crate::arrow::array_reader::test_util::InMemoryArrayReader;
252 use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
253 use crate::arrow::{parquet_to_arrow_schema, ArrowWriter, ProjectionMask};
254 use crate::file::properties::WriterProperties;
255 use crate::file::reader::{FileReader, SerializedFileReader};
256 use crate::schema::parser::parse_message_type;
257 use crate::schema::types::SchemaDescriptor;
258 use arrow::datatypes::{Field, Int32Type as ArrowInt32, Int32Type};
259 use arrow_array::{Array, PrimitiveArray};
260 use arrow_data::ArrayDataBuilder;
261 use arrow_schema::Fields;
262 use std::sync::Arc;
263
264 fn list_type<OffsetSize: OffsetSizeTrait>(
265 data_type: ArrowType,
266 item_nullable: bool,
267 ) -> ArrowType {
268 let field = Arc::new(Field::new_list_field(data_type, item_nullable));
269 GenericListArray::<OffsetSize>::DATA_TYPE_CONSTRUCTOR(field)
270 }
271
272 fn downcast<OffsetSize: OffsetSizeTrait>(array: &ArrayRef) -> &'_ GenericListArray<OffsetSize> {
273 array
274 .as_any()
275 .downcast_ref::<GenericListArray<OffsetSize>>()
276 .unwrap()
277 }
278
279 fn to_offsets<OffsetSize: OffsetSizeTrait>(values: Vec<usize>) -> Buffer {
280 Buffer::from_iter(
281 values
282 .into_iter()
283 .map(|x| OffsetSize::from_usize(x).unwrap()),
284 )
285 }
286
287 fn test_nested_list<OffsetSize: OffsetSizeTrait>() {
288 let l3_item_type = ArrowType::Int32;
303 let l3_type = list_type::<OffsetSize>(l3_item_type, true);
304
305 let l2_item_type = l3_type.clone();
306 let l2_type = list_type::<OffsetSize>(l2_item_type, true);
307
308 let l1_item_type = l2_type.clone();
309 let l1_type = list_type::<OffsetSize>(l1_item_type, false);
310
311 let leaf = PrimitiveArray::<Int32Type>::from_iter(vec![
312 Some(1),
313 None,
314 Some(4),
315 Some(7),
316 Some(1),
317 Some(2),
318 Some(3),
319 Some(4),
320 None,
321 Some(6),
322 Some(11),
323 ]);
324
325 let offsets = to_offsets::<OffsetSize>(vec![0, 2, 2, 3, 3, 4, 4, 7, 10, 10, 11]);
327 let l3 = ArrayDataBuilder::new(l3_type.clone())
328 .len(10)
329 .add_buffer(offsets)
330 .add_child_data(leaf.into_data())
331 .null_bit_buffer(Some(Buffer::from([0b11111101, 0b00000010])))
332 .build()
333 .unwrap();
334
335 let offsets = to_offsets::<OffsetSize>(vec![0, 4, 4, 5, 6, 9, 10]);
337 let l2 = ArrayDataBuilder::new(l2_type.clone())
338 .len(6)
339 .add_buffer(offsets)
340 .add_child_data(l3)
341 .build()
342 .unwrap();
343
344 let offsets = to_offsets::<OffsetSize>(vec![0, 5, 5, 5, 6]);
345 let l1 = ArrayDataBuilder::new(l1_type.clone())
346 .len(4)
347 .add_buffer(offsets)
348 .add_child_data(l2)
349 .null_bit_buffer(Some(Buffer::from([0b00001101])))
350 .build()
351 .unwrap();
352
353 let expected = GenericListArray::<OffsetSize>::from(l1);
354
355 let values = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
356 Some(1),
357 None,
358 None,
359 Some(4),
360 None,
361 None,
362 Some(7),
363 None,
364 Some(1),
365 Some(2),
366 Some(3),
367 Some(4),
368 None,
369 Some(6),
370 None,
371 None,
372 None,
373 Some(11),
374 ]));
375
376 let item_array_reader = InMemoryArrayReader::new(
377 ArrowType::Int32,
378 values,
379 Some(vec![6, 5, 3, 6, 4, 2, 6, 4, 6, 6, 6, 6, 5, 6, 3, 0, 1, 6]),
380 Some(vec![0, 3, 2, 2, 2, 1, 1, 1, 1, 3, 3, 2, 3, 3, 2, 0, 0, 0]),
381 );
382
383 let l3 =
384 ListArrayReader::<OffsetSize>::new(Box::new(item_array_reader), l3_type, 5, 3, true);
385
386 let l2 = ListArrayReader::<OffsetSize>::new(Box::new(l3), l2_type, 3, 2, false);
387
388 let mut l1 = ListArrayReader::<OffsetSize>::new(Box::new(l2), l1_type, 2, 1, true);
389
390 let expected_1 = expected.slice(0, 2);
391 let expected_2 = expected.slice(2, 2);
392
393 let actual = l1.next_batch(2).unwrap();
394 assert_eq!(actual.as_ref(), &expected_1);
395
396 let actual = l1.next_batch(1024).unwrap();
397 assert_eq!(actual.as_ref(), &expected_2);
398 }
399
400 fn test_required_list<OffsetSize: OffsetSizeTrait>() {
401 let expected =
403 GenericListArray::<OffsetSize>::from_iter_primitive::<Int32Type, _, _>(vec![
404 Some(vec![Some(1), None, Some(2)]),
405 Some(vec![]),
406 Some(vec![Some(3), Some(4)]),
407 Some(vec![]),
408 Some(vec![]),
409 Some(vec![None, Some(1)]),
410 ]);
411
412 let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
413 Some(1),
414 None,
415 Some(2),
416 None,
417 Some(3),
418 Some(4),
419 None,
420 None,
421 None,
422 Some(1),
423 ]));
424
425 let item_array_reader = InMemoryArrayReader::new(
426 ArrowType::Int32,
427 array,
428 Some(vec![2, 1, 2, 0, 2, 2, 0, 0, 1, 2]),
429 Some(vec![0, 1, 1, 0, 0, 1, 0, 0, 0, 1]),
430 );
431
432 let mut list_array_reader = ListArrayReader::<OffsetSize>::new(
433 Box::new(item_array_reader),
434 list_type::<OffsetSize>(ArrowType::Int32, true),
435 1,
436 1,
437 false,
438 );
439
440 let actual = list_array_reader.next_batch(1024).unwrap();
441 let actual = downcast::<OffsetSize>(&actual);
442
443 assert_eq!(&expected, actual)
444 }
445
446 fn test_nullable_list<OffsetSize: OffsetSizeTrait>() {
447 let expected =
449 GenericListArray::<OffsetSize>::from_iter_primitive::<Int32Type, _, _>(vec![
450 Some(vec![Some(1), None, Some(2)]),
451 None,
452 Some(vec![]),
453 Some(vec![Some(3), Some(4)]),
454 Some(vec![]),
455 Some(vec![]),
456 None,
457 Some(vec![]),
458 Some(vec![None, Some(1)]),
459 ]);
460
461 let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
462 Some(1),
463 None,
464 Some(2),
465 None,
466 None,
467 Some(3),
468 Some(4),
469 None,
470 None,
471 None,
472 None,
473 None,
474 Some(1),
475 ]));
476
477 let item_array_reader = InMemoryArrayReader::new(
478 ArrowType::Int32,
479 array,
480 Some(vec![3, 2, 3, 0, 1, 3, 3, 1, 1, 0, 1, 2, 3]),
481 Some(vec![0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1]),
482 );
483
484 let mut list_array_reader = ListArrayReader::<OffsetSize>::new(
485 Box::new(item_array_reader),
486 list_type::<OffsetSize>(ArrowType::Int32, true),
487 2,
488 1,
489 true,
490 );
491
492 let actual = list_array_reader.next_batch(1024).unwrap();
493 let actual = downcast::<OffsetSize>(&actual);
494
495 assert_eq!(&expected, actual)
496 }
497
498 fn test_list_array<OffsetSize: OffsetSizeTrait>() {
499 test_nullable_list::<OffsetSize>();
500 test_required_list::<OffsetSize>();
501 test_nested_list::<OffsetSize>();
502 }
503
504 #[test]
505 fn test_list_array_reader() {
506 test_list_array::<i32>();
507 }
508
509 #[test]
510 fn test_large_list_array_reader() {
511 test_list_array::<i64>()
512 }
513
514 #[test]
515 fn test_nested_lists() {
516 let message_type = "
518 message table {
519 REPEATED group table_info {
520 REQUIRED BYTE_ARRAY name;
521 REPEATED group cols {
522 REQUIRED BYTE_ARRAY name;
523 REQUIRED INT32 type;
524 OPTIONAL INT32 length;
525 }
526 REPEATED group tags {
527 REQUIRED BYTE_ARRAY name;
528 REQUIRED INT32 type;
529 OPTIONAL INT32 length;
530 }
531 }
532 }
533 ";
534
535 let schema = parse_message_type(message_type)
536 .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
537 .unwrap();
538
539 let arrow_schema = parquet_to_arrow_schema(schema.as_ref(), None).unwrap();
540
541 let file = tempfile::tempfile().unwrap();
542 let props = WriterProperties::builder()
543 .set_max_row_group_size(200)
544 .build();
545
546 let writer = ArrowWriter::try_new(
547 file.try_clone().unwrap(),
548 Arc::new(arrow_schema),
549 Some(props),
550 )
551 .unwrap();
552 writer.close().unwrap();
553
554 let file_reader: Arc<dyn FileReader> = Arc::new(SerializedFileReader::new(file).unwrap());
555
556 let file_metadata = file_reader.metadata().file_metadata();
557 let schema = file_metadata.schema_descr();
558 let mask = ProjectionMask::leaves(schema, vec![0]);
559 let (_, fields) = parquet_to_arrow_schema_and_fields(
560 schema,
561 ProjectionMask::all(),
562 file_metadata.key_value_metadata(),
563 )
564 .unwrap();
565
566 let mut array_reader = build_array_reader(fields.as_ref(), &mask, &file_reader).unwrap();
567
568 let batch = array_reader.next_batch(100).unwrap();
569 assert_eq!(batch.data_type(), array_reader.get_data_type());
570 assert_eq!(
571 batch.data_type(),
572 &ArrowType::Struct(Fields::from(vec![Field::new(
573 "table_info",
574 ArrowType::List(Arc::new(Field::new(
575 "table_info",
576 ArrowType::Struct(vec![Field::new("name", ArrowType::Binary, false)].into()),
577 false
578 ))),
579 false
580 )]))
581 );
582 assert_eq!(batch.len(), 0);
583 }
584}