parquet/arrow/array_reader/
fixed_size_list_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp::Ordering;
19use std::sync::Arc;
20
21use crate::arrow::array_reader::ArrayReader;
22use crate::errors::ParquetError;
23use crate::errors::Result;
24use arrow_array::FixedSizeListArray;
25use arrow_array::{builder::BooleanBufferBuilder, new_empty_array, Array, ArrayRef};
26use arrow_data::{transform::MutableArrayData, ArrayData};
27use arrow_schema::DataType as ArrowType;
28
29/// Implementation of fixed-size list array reader.
30pub struct FixedSizeListArrayReader {
31    item_reader: Box<dyn ArrayReader>,
32    /// The number of child items in each row of the list array
33    fixed_size: usize,
34    data_type: ArrowType,
35    /// The definition level at which this list is not null
36    def_level: i16,
37    /// The repetition level that corresponds to a new value in this array
38    rep_level: i16,
39    /// If the list is nullable
40    nullable: bool,
41}
42
43impl FixedSizeListArrayReader {
44    /// Construct fixed-size list array reader.
45    pub fn new(
46        item_reader: Box<dyn ArrayReader>,
47        fixed_size: usize,
48        data_type: ArrowType,
49        def_level: i16,
50        rep_level: i16,
51        nullable: bool,
52    ) -> Self {
53        Self {
54            item_reader,
55            fixed_size,
56            data_type,
57            def_level,
58            rep_level,
59            nullable,
60        }
61    }
62}
63
64impl ArrayReader for FixedSizeListArrayReader {
65    fn as_any(&self) -> &dyn std::any::Any {
66        self
67    }
68
69    fn get_data_type(&self) -> &ArrowType {
70        &self.data_type
71    }
72
73    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
74        let size = self.item_reader.read_records(batch_size)?;
75        Ok(size)
76    }
77
78    fn consume_batch(&mut self) -> Result<ArrayRef> {
79        let next_batch_array = self.item_reader.consume_batch()?;
80        if next_batch_array.is_empty() {
81            return Ok(new_empty_array(&self.data_type));
82        }
83
84        let def_levels = self
85            .get_def_levels()
86            .ok_or_else(|| general_err!("item_reader def levels are None"))?;
87        let rep_levels = self
88            .get_rep_levels()
89            .ok_or_else(|| general_err!("item_reader rep levels are None"))?;
90
91        if !rep_levels.is_empty() && rep_levels[0] != 0 {
92            // This implies either the source data was invalid, or the leaf column
93            // reader did not correctly delimit semantic records
94            return Err(general_err!("first repetition level of batch must be 0"));
95        }
96
97        let mut validity = self
98            .nullable
99            .then(|| BooleanBufferBuilder::new(next_batch_array.len()));
100
101        let data = next_batch_array.to_data();
102        let mut child_data_builder =
103            MutableArrayData::new(vec![&data], true, next_batch_array.len());
104
105        // The current index into the child array entries
106        let mut child_idx = 0;
107        // The total number of rows (valid and invalid) in the list array
108        let mut list_len = 0;
109        // Start of the current run of valid values
110        let mut start_idx = None;
111        let mut row_len = 0;
112
113        def_levels.iter().zip(rep_levels).try_for_each(|(d, r)| {
114            match r.cmp(&self.rep_level) {
115                Ordering::Greater => {
116                    // Repetition level greater than current => already handled by inner array
117                    if *d < self.def_level {
118                        return Err(general_err!(
119                            "Encountered repetition level too large for definition level"
120                        ));
121                    }
122                }
123                Ordering::Equal => {
124                    // Item inside of the current list
125                    child_idx += 1;
126                    row_len += 1;
127                }
128                Ordering::Less => {
129                    // Start of new list row
130                    list_len += 1;
131
132                    // Length of the previous row should be equal to:
133                    // - the list's fixed size (valid entries)
134                    // - zero (null entries, start of array)
135                    // Any other length indicates invalid data
136                    if start_idx.is_some() && row_len != self.fixed_size {
137                        return Err(general_err!(
138                            "Encountered misaligned row with length {} (expected length {})",
139                            row_len,
140                            self.fixed_size
141                        ));
142                    }
143                    row_len = 0;
144
145                    if *d >= self.def_level {
146                        row_len += 1;
147
148                        // Valid list entry
149                        if let Some(validity) = validity.as_mut() {
150                            validity.append(true);
151                        }
152                        // Start a run of valid rows if not already inside of one
153                        start_idx.get_or_insert(child_idx);
154                    } else {
155                        // Null list entry
156
157                        if let Some(start) = start_idx.take() {
158                            // Flush pending child items
159                            child_data_builder.extend(0, start, child_idx);
160                        }
161                        // Pad list with nulls
162                        child_data_builder.extend_nulls(self.fixed_size);
163
164                        if let Some(validity) = validity.as_mut() {
165                            // Valid if empty list
166                            validity.append(*d + 1 == self.def_level);
167                        }
168                    }
169                    child_idx += 1;
170                }
171            }
172            Ok(())
173        })?;
174
175        let child_data = match start_idx {
176            Some(0) => {
177                // No null entries - can reuse original array
178                next_batch_array.to_data()
179            }
180            Some(start) => {
181                // Flush pending child items
182                child_data_builder.extend(0, start, child_idx);
183                child_data_builder.freeze()
184            }
185            None => child_data_builder.freeze(),
186        };
187
188        // Verify total number of elements is aligned with fixed list size
189        if list_len * self.fixed_size != child_data.len() {
190            return Err(general_err!(
191                "fixed-size list length must be a multiple of {} but array contains {} elements",
192                self.fixed_size,
193                child_data.len()
194            ));
195        }
196
197        let mut list_builder = ArrayData::builder(self.get_data_type().clone())
198            .len(list_len)
199            .add_child_data(child_data);
200
201        if let Some(builder) = validity {
202            list_builder = list_builder.null_bit_buffer(Some(builder.into()));
203        }
204
205        let list_data = unsafe { list_builder.build_unchecked() };
206
207        let result_array = FixedSizeListArray::from(list_data);
208        Ok(Arc::new(result_array))
209    }
210
211    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
212        self.item_reader.skip_records(num_records)
213    }
214
215    fn get_def_levels(&self) -> Option<&[i16]> {
216        self.item_reader.get_def_levels()
217    }
218
219    fn get_rep_levels(&self) -> Option<&[i16]> {
220        self.item_reader.get_rep_levels()
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227    use crate::arrow::{
228        array_reader::{test_util::InMemoryArrayReader, ListArrayReader},
229        arrow_reader::{ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader},
230        ArrowWriter,
231    };
232    use arrow::datatypes::{Field, Int32Type};
233    use arrow_array::{
234        builder::{FixedSizeListBuilder, Int32Builder, ListBuilder},
235        cast::AsArray,
236        FixedSizeListArray, ListArray, PrimitiveArray, RecordBatch,
237    };
238    use arrow_buffer::Buffer;
239    use arrow_data::ArrayDataBuilder;
240    use arrow_schema::Schema;
241    use bytes::Bytes;
242
243    #[test]
244    fn test_nullable_list() {
245        // [null, [1, null, 2], null, [3, 4, 5], [null, null, null]]
246        let expected = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
247            vec![
248                None,
249                Some([Some(1), None, Some(2)]),
250                None,
251                Some([Some(3), Some(4), Some(5)]),
252                Some([None, None, None]),
253            ],
254            3,
255        );
256
257        let array = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
258            None,
259            Some(1),
260            None,
261            Some(2),
262            None,
263            Some(3),
264            Some(4),
265            Some(5),
266            None,
267            None,
268            None,
269        ]));
270        let item_array_reader = InMemoryArrayReader::new(
271            ArrowType::Int32,
272            array,
273            Some(vec![0, 3, 2, 3, 0, 3, 3, 3, 2, 2, 2]),
274            Some(vec![0, 0, 1, 1, 0, 0, 1, 1, 0, 1, 1]),
275        );
276
277        let mut list_array_reader = FixedSizeListArrayReader::new(
278            Box::new(item_array_reader),
279            3,
280            ArrowType::FixedSizeList(Arc::new(Field::new_list_field(ArrowType::Int32, true)), 3),
281            2,
282            1,
283            true,
284        );
285        let actual = list_array_reader.next_batch(1024).unwrap();
286        let actual = actual
287            .as_any()
288            .downcast_ref::<FixedSizeListArray>()
289            .unwrap();
290        assert_eq!(&expected, actual)
291    }
292
293    #[test]
294    fn test_required_list() {
295        // [[1, null], [2, 3], [null, null], [4, 5]]
296        let expected = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
297            vec![
298                Some([Some(1), None]),
299                Some([Some(2), Some(3)]),
300                Some([None, None]),
301                Some([Some(4), Some(5)]),
302            ],
303            2,
304        );
305
306        let array = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
307            Some(1),
308            None,
309            Some(2),
310            Some(3),
311            None,
312            None,
313            Some(4),
314            Some(5),
315        ]));
316        let item_array_reader = InMemoryArrayReader::new(
317            ArrowType::Int32,
318            array,
319            Some(vec![2, 1, 2, 2, 1, 1, 2, 2]),
320            Some(vec![0, 1, 0, 1, 0, 1, 0, 1]),
321        );
322
323        let mut list_array_reader = FixedSizeListArrayReader::new(
324            Box::new(item_array_reader),
325            2,
326            ArrowType::FixedSizeList(Arc::new(Field::new_list_field(ArrowType::Int32, true)), 2),
327            1,
328            1,
329            false,
330        );
331        let actual = list_array_reader.next_batch(1024).unwrap();
332        let actual = actual
333            .as_any()
334            .downcast_ref::<FixedSizeListArray>()
335            .unwrap();
336        assert_eq!(&expected, actual)
337    }
338
339    #[test]
340    fn test_nested_list() {
341        // [
342        //   null,
343        //   [[1, 2]],
344        //   [[null, 3]],
345        //   null,
346        //   [[4, 5]],
347        //   [[null, null]],
348        // ]
349        let l2_type =
350            ArrowType::FixedSizeList(Arc::new(Field::new_list_field(ArrowType::Int32, true)), 2);
351        let l1_type =
352            ArrowType::FixedSizeList(Arc::new(Field::new_list_field(l2_type.clone(), false)), 1);
353
354        let array = PrimitiveArray::<Int32Type>::from(vec![
355            None,
356            None,
357            Some(1),
358            Some(2),
359            None,
360            Some(3),
361            None,
362            None,
363            Some(4),
364            Some(5),
365            None,
366            None,
367        ]);
368
369        let l2 = ArrayDataBuilder::new(l2_type.clone())
370            .len(6)
371            .add_child_data(array.into_data())
372            .build()
373            .unwrap();
374
375        let l1 = ArrayDataBuilder::new(l1_type.clone())
376            .len(6)
377            .add_child_data(l2)
378            .null_bit_buffer(Some(Buffer::from([0b110110])))
379            .build()
380            .unwrap();
381
382        let expected = FixedSizeListArray::from(l1);
383
384        let values = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
385            None,
386            Some(1),
387            Some(2),
388            None,
389            Some(3),
390            None,
391            Some(4),
392            Some(5),
393            None,
394            None,
395        ]));
396
397        let item_array_reader = InMemoryArrayReader::new(
398            ArrowType::Int32,
399            values,
400            Some(vec![0, 5, 5, 4, 5, 0, 5, 5, 4, 4]),
401            Some(vec![0, 0, 2, 0, 2, 0, 0, 2, 0, 2]),
402        );
403
404        let l2 =
405            FixedSizeListArrayReader::new(Box::new(item_array_reader), 2, l2_type, 4, 2, false);
406        let mut l1 = FixedSizeListArrayReader::new(Box::new(l2), 1, l1_type, 3, 1, true);
407
408        let expected_1 = expected.slice(0, 2);
409        let expected_2 = expected.slice(2, 4);
410
411        let actual = l1.next_batch(2).unwrap();
412        assert_eq!(actual.as_ref(), &expected_1);
413
414        let actual = l1.next_batch(1024).unwrap();
415        assert_eq!(actual.as_ref(), &expected_2);
416    }
417
418    #[test]
419    fn test_empty_list() {
420        // [null, [], null, []]
421        let expected = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
422            vec![None, Some([]), None, Some([])],
423            0,
424        );
425
426        let array = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
427            None, None, None, None,
428        ]));
429        let item_array_reader = InMemoryArrayReader::new(
430            ArrowType::Int32,
431            array,
432            Some(vec![0, 1, 0, 1]),
433            Some(vec![0, 0, 0, 0]),
434        );
435
436        let mut list_array_reader = FixedSizeListArrayReader::new(
437            Box::new(item_array_reader),
438            0,
439            ArrowType::FixedSizeList(Arc::new(Field::new_list_field(ArrowType::Int32, true)), 0),
440            2,
441            1,
442            true,
443        );
444        let actual = list_array_reader.next_batch(1024).unwrap();
445        let actual = actual
446            .as_any()
447            .downcast_ref::<FixedSizeListArray>()
448            .unwrap();
449        assert_eq!(&expected, actual)
450    }
451
452    #[test]
453    fn test_nested_var_list() {
454        // [[[1, null, 3], null], [[4], []], [[5, 6], [null, null]], null]
455        let mut builder = FixedSizeListBuilder::new(ListBuilder::new(Int32Builder::new()), 2);
456        builder.values().append_value([Some(1), None, Some(3)]);
457        builder.values().append_null();
458        builder.append(true);
459        builder.values().append_value([Some(4)]);
460        builder.values().append_value([]);
461        builder.append(true);
462        builder.values().append_value([Some(5), Some(6)]);
463        builder.values().append_value([None, None]);
464        builder.append(true);
465        builder.values().append_null();
466        builder.values().append_null();
467        builder.append(false);
468        let expected = builder.finish();
469
470        let array = Arc::new(PrimitiveArray::<Int32Type>::from(vec![
471            Some(1),
472            None,
473            Some(3),
474            None,
475            Some(4),
476            None,
477            Some(5),
478            Some(6),
479            None,
480            None,
481            None,
482        ]));
483
484        let inner_type = ArrowType::List(Arc::new(Field::new_list_field(ArrowType::Int32, true)));
485        let list_type =
486            ArrowType::FixedSizeList(Arc::new(Field::new_list_field(inner_type.clone(), true)), 2);
487
488        let item_array_reader = InMemoryArrayReader::new(
489            ArrowType::Int32,
490            array,
491            Some(vec![5, 4, 5, 2, 5, 3, 5, 5, 4, 4, 0]),
492            Some(vec![0, 2, 2, 1, 0, 1, 0, 2, 1, 2, 0]),
493        );
494
495        let inner_array_reader =
496            ListArrayReader::<i32>::new(Box::new(item_array_reader), inner_type, 4, 2, true);
497
498        let mut list_array_reader =
499            FixedSizeListArrayReader::new(Box::new(inner_array_reader), 2, list_type, 2, 1, true);
500        let actual = list_array_reader.next_batch(1024).unwrap();
501        let actual = actual
502            .as_any()
503            .downcast_ref::<FixedSizeListArray>()
504            .unwrap();
505        assert_eq!(&expected, actual)
506    }
507
508    #[test]
509    fn test_read_list_column() {
510        // This test writes a Parquet file containing a fixed-length array column and a primitive column,
511        // then reads the columns back from the file.
512
513        // [
514        //   [1, 2, 3, null],
515        //   [5, 6, 7, 8],
516        //   null,
517        //   [9, null, 11, 12],
518        // ]
519        let list = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
520            vec![
521                Some(vec![Some(1), Some(2), Some(3), None]),
522                Some(vec![Some(5), Some(6), Some(7), Some(8)]),
523                None,
524                Some(vec![Some(9), None, Some(11), Some(12)]),
525                Some(vec![None, None, None, None]),
526            ],
527            4,
528        );
529
530        // [null, 2, 3, null, 5]
531        let primitive =
532            PrimitiveArray::<Int32Type>::from_iter(vec![None, Some(2), Some(3), None, Some(5)]);
533
534        let schema = Arc::new(Schema::new(vec![
535            Field::new(
536                "list",
537                ArrowType::FixedSizeList(
538                    Arc::new(Field::new_list_field(ArrowType::Int32, true)),
539                    4,
540                ),
541                true,
542            ),
543            Field::new("primitive", ArrowType::Int32, true),
544        ]));
545
546        // Create record batch with a fixed-length array column and a primitive column
547        let batch = RecordBatch::try_new(
548            schema.clone(),
549            vec![Arc::new(list.clone()), Arc::new(primitive.clone())],
550        )
551        .expect("unable to create record batch");
552
553        // Write record batch to Parquet
554        let mut buffer = Vec::with_capacity(1024);
555        let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None)
556            .expect("unable to create parquet writer");
557        writer.write(&batch).expect("unable to write record batch");
558        writer.close().expect("unable to close parquet writer");
559
560        // Read record batch from Parquet
561        let reader = Bytes::from(buffer);
562        let mut batch_reader = ParquetRecordBatchReader::try_new(reader, 1024)
563            .expect("unable to create parquet reader");
564        let actual = batch_reader
565            .next()
566            .expect("missing record batch")
567            .expect("unable to read record batch");
568
569        // Verify values of both read columns match
570        assert_eq!(schema, actual.schema());
571        let actual_list = actual
572            .column(0)
573            .as_any()
574            .downcast_ref::<FixedSizeListArray>()
575            .expect("unable to cast array to FixedSizeListArray");
576        let actual_primitive = actual.column(1).as_primitive::<Int32Type>();
577        assert_eq!(actual_list, &list);
578        assert_eq!(actual_primitive, &primitive);
579    }
580
581    #[test]
582    fn test_read_as_dyn_list() {
583        // This test verifies that fixed-size list arrays can be read from Parquet
584        // as variable-length list arrays.
585
586        // [
587        //   [1, 2, 3, null],
588        //   [5, 6, 7, 8],
589        //   null,
590        //   [9, null, 11, 12],
591        // ]
592        let list = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
593            vec![
594                Some(vec![Some(1), Some(2), Some(3), None]),
595                Some(vec![Some(5), Some(6), Some(7), Some(8)]),
596                None,
597                Some(vec![Some(9), None, Some(11), Some(12)]),
598                Some(vec![None, None, None, None]),
599            ],
600            4,
601        );
602
603        let schema = Arc::new(Schema::new(vec![Field::new(
604            "list",
605            ArrowType::FixedSizeList(Arc::new(Field::new_list_field(ArrowType::Int32, true)), 4),
606            true,
607        )]));
608
609        // Create record batch with a single fixed-length array column
610        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list)]).unwrap();
611
612        // Write record batch to Parquet
613        let mut buffer = Vec::with_capacity(1024);
614        let mut writer = ArrowWriter::try_new(&mut buffer, schema, None)
615            .expect("unable to create parquet writer");
616        writer.write(&batch).expect("unable to write record batch");
617        writer.close().expect("unable to close parquet writer");
618
619        // Read record batch from Parquet - ignoring arrow metadata
620        let reader = Bytes::from(buffer);
621        let mut batch_reader = ArrowReaderBuilder::try_new_with_options(
622            reader,
623            ArrowReaderOptions::new().with_skip_arrow_metadata(true),
624        )
625        .expect("unable to create reader builder")
626        .build()
627        .expect("unable to create parquet reader");
628        let actual = batch_reader
629            .next()
630            .expect("missing record batch")
631            .expect("unable to read record batch");
632
633        // Verify the read column is a variable length list with values that match the input
634        let col = actual.column(0).as_list::<i32>();
635        let expected = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
636            Some(vec![Some(1), Some(2), Some(3), None]),
637            Some(vec![Some(5), Some(6), Some(7), Some(8)]),
638            None,
639            Some(vec![Some(9), None, Some(11), Some(12)]),
640            Some(vec![None, None, None, None]),
641        ]);
642        assert_eq!(col, &expected);
643    }
644}