parquet/arrow/array_reader/
struct_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::array_reader::ArrayReader;
19use crate::errors::{ParquetError, Result};
20use arrow_array::{builder::BooleanBufferBuilder, Array, ArrayRef, StructArray};
21use arrow_data::{ArrayData, ArrayDataBuilder};
22use arrow_schema::DataType as ArrowType;
23use std::any::Any;
24use std::sync::Arc;
25
26/// Implementation of struct array reader.
27pub struct StructArrayReader {
28    children: Vec<Box<dyn ArrayReader>>,
29    data_type: ArrowType,
30    struct_def_level: i16,
31    struct_rep_level: i16,
32    nullable: bool,
33}
34
35impl StructArrayReader {
36    /// Construct struct array reader.
37    pub fn new(
38        data_type: ArrowType,
39        children: Vec<Box<dyn ArrayReader>>,
40        def_level: i16,
41        rep_level: i16,
42        nullable: bool,
43    ) -> Self {
44        Self {
45            data_type,
46            children,
47            struct_def_level: def_level,
48            struct_rep_level: rep_level,
49            nullable,
50        }
51    }
52}
53
54impl ArrayReader for StructArrayReader {
55    fn as_any(&self) -> &dyn Any {
56        self
57    }
58
59    /// Returns data type.
60    /// This must be a struct.
61    fn get_data_type(&self) -> &ArrowType {
62        &self.data_type
63    }
64
65    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
66        let mut read = None;
67        for child in self.children.iter_mut() {
68            let child_read = child.read_records(batch_size)?;
69            match read {
70                Some(expected) => {
71                    if expected != child_read {
72                        return Err(general_err!(
73                            "StructArrayReader out of sync in read_records, expected {} read, got {}",
74                            expected,
75                            child_read
76                        ));
77                    }
78                }
79                None => read = Some(child_read),
80            }
81        }
82        Ok(read.unwrap_or(0))
83    }
84
85    /// Consume struct records.
86    ///
87    /// Definition levels of struct array is calculated as following:
88    /// ```ignore
89    /// def_levels[i] = min(child1_def_levels[i], child2_def_levels[i], ...,
90    /// childn_def_levels[i]);
91    /// ```
92    ///
93    /// Repetition levels of struct array is calculated as following:
94    /// ```ignore
95    /// rep_levels[i] = child1_rep_levels[i];
96    /// ```
97    ///
98    /// The null bitmap of struct array is calculated from def_levels:
99    /// ```ignore
100    /// null_bitmap[i] = (def_levels[i] >= self.def_level);
101    /// ```
102    ///
103    fn consume_batch(&mut self) -> Result<ArrayRef> {
104        if self.children.is_empty() {
105            return Ok(Arc::new(StructArray::from(Vec::new())));
106        }
107
108        let children_array = self
109            .children
110            .iter_mut()
111            .map(|reader| reader.consume_batch())
112            .collect::<Result<Vec<_>>>()?;
113
114        // check that array child data has same size
115        let children_array_len = children_array
116            .first()
117            .map(|arr| arr.len())
118            .ok_or_else(|| general_err!("Struct array reader should have at least one child!"))?;
119
120        let all_children_len_eq = children_array
121            .iter()
122            .all(|arr| arr.len() == children_array_len);
123        if !all_children_len_eq {
124            return Err(general_err!("Not all children array length are the same!"));
125        }
126
127        // Now we can build array data
128        let mut array_data_builder = ArrayDataBuilder::new(self.data_type.clone())
129            .len(children_array_len)
130            .child_data(
131                children_array
132                    .iter()
133                    .map(|x| x.to_data())
134                    .collect::<Vec<ArrayData>>(),
135            );
136
137        if self.nullable {
138            // calculate struct def level data
139
140            // children should have consistent view of parent, only need to inspect first child
141            let def_levels = self.children[0]
142                .get_def_levels()
143                .expect("child with nullable parents must have definition level");
144
145            // calculate bitmap for current array
146            let mut bitmap_builder = BooleanBufferBuilder::new(children_array_len);
147
148            match self.children[0].get_rep_levels() {
149                Some(rep_levels) => {
150                    // Sanity check
151                    assert_eq!(rep_levels.len(), def_levels.len());
152
153                    for (rep_level, def_level) in rep_levels.iter().zip(def_levels) {
154                        if rep_level > &self.struct_rep_level {
155                            // Already handled by inner list - SKIP
156                            continue;
157                        }
158                        bitmap_builder.append(*def_level >= self.struct_def_level)
159                    }
160                }
161                None => {
162                    for def_level in def_levels {
163                        bitmap_builder.append(*def_level >= self.struct_def_level)
164                    }
165                }
166            }
167
168            if bitmap_builder.len() != children_array_len {
169                return Err(general_err!("Failed to decode level data for struct array"));
170            }
171
172            array_data_builder = array_data_builder.null_bit_buffer(Some(bitmap_builder.into()));
173        }
174
175        let array_data = unsafe { array_data_builder.build_unchecked() };
176        Ok(Arc::new(StructArray::from(array_data)))
177    }
178
179    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
180        let mut skipped = None;
181        for child in self.children.iter_mut() {
182            let child_skipped = child.skip_records(num_records)?;
183            match skipped {
184                Some(expected) => {
185                    if expected != child_skipped {
186                        return Err(general_err!(
187                            "StructArrayReader out of sync, expected {} skipped, got {}",
188                            expected,
189                            child_skipped
190                        ));
191                    }
192                }
193                None => skipped = Some(child_skipped),
194            }
195        }
196        Ok(skipped.unwrap_or(0))
197    }
198
199    fn get_def_levels(&self) -> Option<&[i16]> {
200        // Children definition levels should describe the same
201        // parent structure, so return first child's
202        self.children.first().and_then(|l| l.get_def_levels())
203    }
204
205    fn get_rep_levels(&self) -> Option<&[i16]> {
206        // Children definition levels should describe the same
207        // parent structure, so return first child's
208        self.children.first().and_then(|l| l.get_rep_levels())
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use crate::arrow::array_reader::test_util::InMemoryArrayReader;
216    use crate::arrow::array_reader::ListArrayReader;
217    use arrow::buffer::Buffer;
218    use arrow::datatypes::Field;
219    use arrow_array::cast::AsArray;
220    use arrow_array::{Array, Int32Array, ListArray};
221    use arrow_schema::Fields;
222
223    #[test]
224    fn test_struct_array_reader() {
225        let array_1 = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
226        let array_reader_1 = InMemoryArrayReader::new(
227            ArrowType::Int32,
228            array_1.clone(),
229            Some(vec![0, 1, 2, 3, 1]),
230            Some(vec![0, 1, 1, 1, 1]),
231        );
232
233        let array_2 = Arc::new(Int32Array::from(vec![5, 4, 3, 2, 1]));
234        let array_reader_2 = InMemoryArrayReader::new(
235            ArrowType::Int32,
236            array_2.clone(),
237            Some(vec![0, 1, 3, 1, 2]),
238            Some(vec![0, 1, 1, 1, 1]),
239        );
240
241        let struct_type = ArrowType::Struct(Fields::from(vec![
242            Field::new("f1", array_1.data_type().clone(), true),
243            Field::new("f2", array_2.data_type().clone(), true),
244        ]));
245
246        let mut struct_array_reader = StructArrayReader::new(
247            struct_type,
248            vec![Box::new(array_reader_1), Box::new(array_reader_2)],
249            1,
250            1,
251            true,
252        );
253
254        let struct_array = struct_array_reader.next_batch(5).unwrap();
255        let struct_array = struct_array.as_struct();
256
257        assert_eq!(5, struct_array.len());
258        assert_eq!(
259            vec![true, false, false, false, false],
260            (0..5)
261                .map(|idx| struct_array.is_null(idx))
262                .collect::<Vec<bool>>()
263        );
264        assert_eq!(
265            Some(vec![0, 1, 2, 3, 1].as_slice()),
266            struct_array_reader.get_def_levels()
267        );
268        assert_eq!(
269            Some(vec![0, 1, 1, 1, 1].as_slice()),
270            struct_array_reader.get_rep_levels()
271        );
272    }
273
274    #[test]
275    fn test_struct_array_reader_list() {
276        use arrow::datatypes::Int32Type;
277        // [
278        //    {foo: [1, 2, null],
279        //    {foo: []},
280        //    {foo: null},
281        //    null,
282        // ]
283
284        let expected_l = Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
285            Some(vec![Some(1), Some(2), None]),
286            Some(vec![]),
287            None,
288            None,
289        ]));
290
291        let validity = Buffer::from([0b00000111]);
292        let struct_fields = vec![(
293            Arc::new(Field::new("foo", expected_l.data_type().clone(), true)),
294            expected_l.clone() as ArrayRef,
295        )];
296        let expected = StructArray::from((struct_fields, validity));
297
298        let array = Arc::new(Int32Array::from_iter(vec![
299            Some(1),
300            Some(2),
301            None,
302            None,
303            None,
304            None,
305        ]));
306        let reader = InMemoryArrayReader::new(
307            ArrowType::Int32,
308            array,
309            Some(vec![4, 4, 3, 2, 1, 0]),
310            Some(vec![0, 1, 1, 0, 0, 0]),
311        );
312
313        let list_reader = ListArrayReader::<i32>::new(
314            Box::new(reader),
315            expected_l.data_type().clone(),
316            3,
317            1,
318            true,
319        );
320
321        let mut struct_reader = StructArrayReader::new(
322            expected.data_type().clone(),
323            vec![Box::new(list_reader)],
324            1,
325            0,
326            true,
327        );
328
329        let actual = struct_reader.next_batch(1024).unwrap();
330        let actual = actual.as_struct();
331        assert_eq!(actual, &expected)
332    }
333}