parquet/arrow/array_reader/
list_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::array_reader::ArrayReader;
19use crate::errors::ParquetError;
20use crate::errors::Result;
21use arrow_array::{
22    builder::BooleanBufferBuilder, new_empty_array, Array, ArrayRef, GenericListArray,
23    OffsetSizeTrait,
24};
25use arrow_buffer::Buffer;
26use arrow_buffer::ToByteSlice;
27use arrow_data::{transform::MutableArrayData, ArrayData};
28use arrow_schema::DataType as ArrowType;
29use std::any::Any;
30use std::cmp::Ordering;
31use std::marker::PhantomData;
32use std::sync::Arc;
33
34/// Implementation of list array reader.
35pub struct ListArrayReader<OffsetSize: OffsetSizeTrait> {
36    item_reader: Box<dyn ArrayReader>,
37    data_type: ArrowType,
38    /// The definition level at which this list is not null
39    def_level: i16,
40    /// The repetition level that corresponds to a new value in this array
41    rep_level: i16,
42    /// If this list is nullable
43    nullable: bool,
44    _marker: PhantomData<OffsetSize>,
45}
46
47impl<OffsetSize: OffsetSizeTrait> ListArrayReader<OffsetSize> {
48    /// Construct list array reader.
49    pub fn new(
50        item_reader: Box<dyn ArrayReader>,
51        data_type: ArrowType,
52        def_level: i16,
53        rep_level: i16,
54        nullable: bool,
55    ) -> Self {
56        Self {
57            item_reader,
58            data_type,
59            def_level,
60            rep_level,
61            nullable,
62            _marker: PhantomData,
63        }
64    }
65}
66
67/// Implementation of ListArrayReader. Nested lists and lists of structs are not yet supported.
68impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
69    fn as_any(&self) -> &dyn Any {
70        self
71    }
72
73    /// Returns data type.
74    /// This must be a List.
75    fn get_data_type(&self) -> &ArrowType {
76        &self.data_type
77    }
78
79    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
80        let size = self.item_reader.read_records(batch_size)?;
81        Ok(size)
82    }
83
84    fn consume_batch(&mut self) -> Result<ArrayRef> {
85        let next_batch_array = self.item_reader.consume_batch()?;
86        if next_batch_array.len() == 0 {
87            return Ok(new_empty_array(&self.data_type));
88        }
89
90        let def_levels = self
91            .item_reader
92            .get_def_levels()
93            .ok_or_else(|| general_err!("item_reader def levels are None."))?;
94
95        let rep_levels = self
96            .item_reader
97            .get_rep_levels()
98            .ok_or_else(|| general_err!("item_reader rep levels are None."))?;
99
100        if OffsetSize::from_usize(next_batch_array.len()).is_none() {
101            return Err(general_err!(
102                "offset of {} would overflow list array",
103                next_batch_array.len()
104            ));
105        }
106
107        if !rep_levels.is_empty() && rep_levels[0] != 0 {
108            // This implies either the source data was invalid, or the leaf column
109            // reader did not correctly delimit semantic records
110            return Err(general_err!("first repetition level of batch must be 0"));
111        }
112
113        // A non-nullable list has a single definition level indicating if the list is empty
114        //
115        // A nullable list has two definition levels associated with it:
116        //
117        // The first identifies if the list is null
118        // The second identifies if the list is empty
119        //
120        // The child data returned above is padded with a value for each not-fully defined level.
121        // Therefore null and empty lists will correspond to a value in the child array.
122        //
123        // Whilst nulls may have a non-zero slice in the offsets array, empty lists must
124        // be of zero length. As a result we MUST filter out values corresponding to empty
125        // lists, and for consistency we do the same for nulls.
126
127        // The output offsets for the computed ListArray
128        let mut list_offsets: Vec<OffsetSize> = Vec::with_capacity(next_batch_array.len() + 1);
129
130        // The validity mask of the computed ListArray if nullable
131        let mut validity = self
132            .nullable
133            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
134
135        // The offset into the filtered child data of the current level being considered
136        let mut cur_offset = 0;
137
138        // Identifies the start of a run of values to copy from the source child data
139        let mut filter_start = None;
140
141        // The number of child values skipped due to empty lists or nulls
142        let mut skipped = 0;
143
144        // Builder used to construct the filtered child data, skipping empty lists and nulls
145        let data = next_batch_array.to_data();
146        let mut child_data_builder =
147            MutableArrayData::new(vec![&data], false, next_batch_array.len());
148
149        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
150            match r.cmp(&self.rep_level) {
151                Ordering::Greater => {
152                    // Repetition level greater than current => already handled by inner array
153                    if *d < self.def_level {
154                        return Err(general_err!(
155                            "Encountered repetition level too large for definition level"
156                        ));
157                    }
158                }
159                Ordering::Equal => {
160                    // New value in the current list
161                    cur_offset += 1;
162                }
163                Ordering::Less => {
164                    // Create new array slice
165                    // Already checked that this cannot overflow
166                    list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap());
167
168                    if *d >= self.def_level {
169                        // Fully defined value
170
171                        // Record current offset if it is None
172                        filter_start.get_or_insert(cur_offset + skipped);
173
174                        cur_offset += 1;
175
176                        if let Some(validity) = validity.as_mut() {
177                            validity.append(true)
178                        }
179                    } else {
180                        // Flush the current slice of child values if any
181                        if let Some(start) = filter_start.take() {
182                            child_data_builder.extend(0, start, cur_offset + skipped);
183                        }
184
185                        if let Some(validity) = validity.as_mut() {
186                            // Valid if empty list
187                            validity.append(*d + 1 == self.def_level)
188                        }
189
190                        skipped += 1;
191                    }
192                }
193            }
194            Ok(())
195        })?;
196
197        list_offsets.push(OffsetSize::from_usize(cur_offset).unwrap());
198
199        let child_data = if skipped == 0 {
200            // No filtered values - can reuse original array
201            next_batch_array.to_data()
202        } else {
203            // One or more filtered values - must build new array
204            if let Some(start) = filter_start.take() {
205                child_data_builder.extend(0, start, cur_offset + skipped)
206            }
207
208            child_data_builder.freeze()
209        };
210
211        if cur_offset != child_data.len() {
212            return Err(general_err!("Failed to reconstruct list from level data"));
213        }
214
215        let value_offsets = Buffer::from(list_offsets.to_byte_slice());
216
217        let mut data_builder = ArrayData::builder(self.get_data_type().clone())
218            .len(list_offsets.len() - 1)
219            .add_buffer(value_offsets)
220            .add_child_data(child_data);
221
222        if let Some(builder) = validity {
223            assert_eq!(builder.len(), list_offsets.len() - 1);
224            data_builder = data_builder.null_bit_buffer(Some(builder.into()))
225        }
226
227        let list_data = unsafe { data_builder.build_unchecked() };
228
229        let result_array = GenericListArray::<OffsetSize>::from(list_data);
230        Ok(Arc::new(result_array))
231    }
232
233    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
234        self.item_reader.skip_records(num_records)
235    }
236
237    fn get_def_levels(&self) -> Option<&[i16]> {
238        self.item_reader.get_def_levels()
239    }
240
241    fn get_rep_levels(&self) -> Option<&[i16]> {
242        self.item_reader.get_rep_levels()
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use crate::arrow::array_reader::build_array_reader;
250    use crate::arrow::array_reader::list_array::ListArrayReader;
251    use crate::arrow::array_reader::test_util::InMemoryArrayReader;
252    use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
253    use crate::arrow::{parquet_to_arrow_schema, ArrowWriter, ProjectionMask};
254    use crate::file::properties::WriterProperties;
255    use crate::file::reader::{FileReader, SerializedFileReader};
256    use crate::schema::parser::parse_message_type;
257    use crate::schema::types::SchemaDescriptor;
258    use arrow::datatypes::{Field, Int32Type as ArrowInt32, Int32Type};
259    use arrow_array::{Array, PrimitiveArray};
260    use arrow_data::ArrayDataBuilder;
261    use arrow_schema::Fields;
262    use std::sync::Arc;
263
264    fn list_type<OffsetSize: OffsetSizeTrait>(
265        data_type: ArrowType,
266        item_nullable: bool,
267    ) -> ArrowType {
268        let field = Arc::new(Field::new_list_field(data_type, item_nullable));
269        GenericListArray::<OffsetSize>::DATA_TYPE_CONSTRUCTOR(field)
270    }
271
272    fn downcast<OffsetSize: OffsetSizeTrait>(array: &ArrayRef) -> &'_ GenericListArray<OffsetSize> {
273        array
274            .as_any()
275            .downcast_ref::<GenericListArray<OffsetSize>>()
276            .unwrap()
277    }
278
279    fn to_offsets<OffsetSize: OffsetSizeTrait>(values: Vec<usize>) -> Buffer {
280        Buffer::from_iter(
281            values
282                .into_iter()
283                .map(|x| OffsetSize::from_usize(x).unwrap()),
284        )
285    }
286
287    fn test_nested_list<OffsetSize: OffsetSizeTrait>() {
288        // 3 lists, with first and third nullable
289        // [
290        //     [
291        //         [[1, null], null, [4], []],
292        //         [],
293        //         [[7]],
294        //         [[]],
295        //         [[1, 2, 3], [4, null, 6], null]
296        //     ],
297        //     null,
298        //     [],
299        //     [[[11]]]
300        // ]
301
302        let l3_item_type = ArrowType::Int32;
303        let l3_type = list_type::<OffsetSize>(l3_item_type, true);
304
305        let l2_item_type = l3_type.clone();
306        let l2_type = list_type::<OffsetSize>(l2_item_type, true);
307
308        let l1_item_type = l2_type.clone();
309        let l1_type = list_type::<OffsetSize>(l1_item_type, false);
310
311        let leaf = PrimitiveArray::<Int32Type>::from_iter(vec![
312            Some(1),
313            None,
314            Some(4),
315            Some(7),
316            Some(1),
317            Some(2),
318            Some(3),
319            Some(4),
320            None,
321            Some(6),
322            Some(11),
323        ]);
324
325        // [[1, null], null, [4], [], [7], [], [1, 2, 3], [4, null, 6], null, [11]]
326        let offsets = to_offsets::<OffsetSize>(vec![0, 2, 2, 3, 3, 4, 4, 7, 10, 10, 11]);
327        let l3 = ArrayDataBuilder::new(l3_type.clone())
328            .len(10)
329            .add_buffer(offsets)
330            .add_child_data(leaf.into_data())
331            .null_bit_buffer(Some(Buffer::from([0b11111101, 0b00000010])))
332            .build()
333            .unwrap();
334
335        // [[[1, null], null, [4], []], [], [[7]], [[]], [[1, 2, 3], [4, null, 6], null], [[11]]]
336        let offsets = to_offsets::<OffsetSize>(vec![0, 4, 4, 5, 6, 9, 10]);
337        let l2 = ArrayDataBuilder::new(l2_type.clone())
338            .len(6)
339            .add_buffer(offsets)
340            .add_child_data(l3)
341            .build()
342            .unwrap();
343
344        let offsets = to_offsets::<OffsetSize>(vec![0, 5, 5, 5, 6]);
345        let l1 = ArrayDataBuilder::new(l1_type.clone())
346            .len(4)
347            .add_buffer(offsets)
348            .add_child_data(l2)
349            .null_bit_buffer(Some(Buffer::from([0b00001101])))
350            .build()
351            .unwrap();
352
353        let expected = GenericListArray::<OffsetSize>::from(l1);
354
355        let values = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
356            Some(1),
357            None,
358            None,
359            Some(4),
360            None,
361            None,
362            Some(7),
363            None,
364            Some(1),
365            Some(2),
366            Some(3),
367            Some(4),
368            None,
369            Some(6),
370            None,
371            None,
372            None,
373            Some(11),
374        ]));
375
376        let item_array_reader = InMemoryArrayReader::new(
377            ArrowType::Int32,
378            values,
379            Some(vec![6, 5, 3, 6, 4, 2, 6, 4, 6, 6, 6, 6, 5, 6, 3, 0, 1, 6]),
380            Some(vec![0, 3, 2, 2, 2, 1, 1, 1, 1, 3, 3, 2, 3, 3, 2, 0, 0, 0]),
381        );
382
383        let l3 =
384            ListArrayReader::<OffsetSize>::new(Box::new(item_array_reader), l3_type, 5, 3, true);
385
386        let l2 = ListArrayReader::<OffsetSize>::new(Box::new(l3), l2_type, 3, 2, false);
387
388        let mut l1 = ListArrayReader::<OffsetSize>::new(Box::new(l2), l1_type, 2, 1, true);
389
390        let expected_1 = expected.slice(0, 2);
391        let expected_2 = expected.slice(2, 2);
392
393        let actual = l1.next_batch(2).unwrap();
394        assert_eq!(actual.as_ref(), &expected_1);
395
396        let actual = l1.next_batch(1024).unwrap();
397        assert_eq!(actual.as_ref(), &expected_2);
398    }
399
400    fn test_required_list<OffsetSize: OffsetSizeTrait>() {
401        // [[1, null, 2], [], [3, 4], [], [], [null, 1]]
402        let expected =
403            GenericListArray::<OffsetSize>::from_iter_primitive::<Int32Type, _, _>(vec![
404                Some(vec![Some(1), None, Some(2)]),
405                Some(vec![]),
406                Some(vec![Some(3), Some(4)]),
407                Some(vec![]),
408                Some(vec![]),
409                Some(vec![None, Some(1)]),
410            ]);
411
412        let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
413            Some(1),
414            None,
415            Some(2),
416            None,
417            Some(3),
418            Some(4),
419            None,
420            None,
421            None,
422            Some(1),
423        ]));
424
425        let item_array_reader = InMemoryArrayReader::new(
426            ArrowType::Int32,
427            array,
428            Some(vec![2, 1, 2, 0, 2, 2, 0, 0, 1, 2]),
429            Some(vec![0, 1, 1, 0, 0, 1, 0, 0, 0, 1]),
430        );
431
432        let mut list_array_reader = ListArrayReader::<OffsetSize>::new(
433            Box::new(item_array_reader),
434            list_type::<OffsetSize>(ArrowType::Int32, true),
435            1,
436            1,
437            false,
438        );
439
440        let actual = list_array_reader.next_batch(1024).unwrap();
441        let actual = downcast::<OffsetSize>(&actual);
442
443        assert_eq!(&expected, actual)
444    }
445
446    fn test_nullable_list<OffsetSize: OffsetSizeTrait>() {
447        // [[1, null, 2], null, [], [3, 4], [], [], null, [], [null, 1]]
448        let expected =
449            GenericListArray::<OffsetSize>::from_iter_primitive::<Int32Type, _, _>(vec![
450                Some(vec![Some(1), None, Some(2)]),
451                None,
452                Some(vec![]),
453                Some(vec![Some(3), Some(4)]),
454                Some(vec![]),
455                Some(vec![]),
456                None,
457                Some(vec![]),
458                Some(vec![None, Some(1)]),
459            ]);
460
461        let array = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![
462            Some(1),
463            None,
464            Some(2),
465            None,
466            None,
467            Some(3),
468            Some(4),
469            None,
470            None,
471            None,
472            None,
473            None,
474            Some(1),
475        ]));
476
477        let item_array_reader = InMemoryArrayReader::new(
478            ArrowType::Int32,
479            array,
480            Some(vec![3, 2, 3, 0, 1, 3, 3, 1, 1, 0, 1, 2, 3]),
481            Some(vec![0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1]),
482        );
483
484        let mut list_array_reader = ListArrayReader::<OffsetSize>::new(
485            Box::new(item_array_reader),
486            list_type::<OffsetSize>(ArrowType::Int32, true),
487            2,
488            1,
489            true,
490        );
491
492        let actual = list_array_reader.next_batch(1024).unwrap();
493        let actual = downcast::<OffsetSize>(&actual);
494
495        assert_eq!(&expected, actual)
496    }
497
498    fn test_list_array<OffsetSize: OffsetSizeTrait>() {
499        test_nullable_list::<OffsetSize>();
500        test_required_list::<OffsetSize>();
501        test_nested_list::<OffsetSize>();
502    }
503
504    #[test]
505    fn test_list_array_reader() {
506        test_list_array::<i32>();
507    }
508
509    #[test]
510    fn test_large_list_array_reader() {
511        test_list_array::<i64>()
512    }
513
514    #[test]
515    fn test_nested_lists() {
516        // Construct column schema
517        let message_type = "
518        message table {
519            REPEATED group table_info {
520                REQUIRED BYTE_ARRAY name;
521                REPEATED group cols {
522                    REQUIRED BYTE_ARRAY name;
523                    REQUIRED INT32 type;
524                    OPTIONAL INT32 length;
525                }
526                REPEATED group tags {
527                    REQUIRED BYTE_ARRAY name;
528                    REQUIRED INT32 type;
529                    OPTIONAL INT32 length;
530                }
531            }
532        }
533        ";
534
535        let schema = parse_message_type(message_type)
536            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
537            .unwrap();
538
539        let arrow_schema = parquet_to_arrow_schema(schema.as_ref(), None).unwrap();
540
541        let file = tempfile::tempfile().unwrap();
542        let props = WriterProperties::builder()
543            .set_max_row_group_size(200)
544            .build();
545
546        let writer = ArrowWriter::try_new(
547            file.try_clone().unwrap(),
548            Arc::new(arrow_schema),
549            Some(props),
550        )
551        .unwrap();
552        writer.close().unwrap();
553
554        let file_reader: Arc<dyn FileReader> = Arc::new(SerializedFileReader::new(file).unwrap());
555
556        let file_metadata = file_reader.metadata().file_metadata();
557        let schema = file_metadata.schema_descr();
558        let mask = ProjectionMask::leaves(schema, vec![0]);
559        let (_, fields) = parquet_to_arrow_schema_and_fields(
560            schema,
561            ProjectionMask::all(),
562            file_metadata.key_value_metadata(),
563        )
564        .unwrap();
565
566        let mut array_reader = build_array_reader(fields.as_ref(), &mask, &file_reader).unwrap();
567
568        let batch = array_reader.next_batch(100).unwrap();
569        assert_eq!(batch.data_type(), array_reader.get_data_type());
570        assert_eq!(
571            batch.data_type(),
572            &ArrowType::Struct(Fields::from(vec![Field::new(
573                "table_info",
574                ArrowType::List(Arc::new(Field::new(
575                    "table_info",
576                    ArrowType::Struct(vec![Field::new("name", ArrowType::Binary, false)].into()),
577                    false
578                ))),
579                false
580            )]))
581        );
582        assert_eq!(batch.len(), 0);
583    }
584}