Skip to main content

parquet/arrow/array_reader/
struct_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::array_reader::ArrayReader;
19use crate::errors::{ParquetError, Result};
20use arrow_array::{Array, ArrayRef, StructArray, builder::BooleanBufferBuilder};
21use arrow_buffer::NullBuffer;
22use arrow_schema::{DataType as ArrowType, DataType};
23use std::any::Any;
24use std::sync::Arc;
25
26/// Implementation of struct array reader.
27pub struct StructArrayReader {
28    children: Vec<Box<dyn ArrayReader>>,
29    data_type: ArrowType,
30    struct_def_level: i16,
31    struct_rep_level: i16,
32    nullable: bool,
33}
34
35impl StructArrayReader {
36    /// Construct struct array reader.
37    pub fn new(
38        data_type: ArrowType,
39        children: Vec<Box<dyn ArrayReader>>,
40        def_level: i16,
41        rep_level: i16,
42        nullable: bool,
43    ) -> Self {
44        Self {
45            data_type,
46            children,
47            struct_def_level: def_level,
48            struct_rep_level: rep_level,
49            nullable,
50        }
51    }
52}
53
54impl ArrayReader for StructArrayReader {
55    fn as_any(&self) -> &dyn Any {
56        self
57    }
58
59    /// Returns data type.
60    /// This must be a struct.
61    fn get_data_type(&self) -> &ArrowType {
62        &self.data_type
63    }
64
65    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
66        let mut read = None;
67        for child in self.children.iter_mut() {
68            let child_read = child.read_records(batch_size)?;
69            match read {
70                Some(expected) => {
71                    if expected != child_read {
72                        return Err(general_err!(
73                            "StructArrayReader out of sync in read_records, expected {} read, got {}",
74                            expected,
75                            child_read
76                        ));
77                    }
78                }
79                None => read = Some(child_read),
80            }
81        }
82        Ok(read.unwrap_or(0))
83    }
84
85    /// Consume struct records.
86    ///
87    /// Definition levels of struct array is calculated as following:
88    /// ```ignore
89    /// def_levels[i] = min(child1_def_levels[i], child2_def_levels[i], ...,
90    /// childn_def_levels[i]);
91    /// ```
92    ///
93    /// Repetition levels of struct array is calculated as following:
94    /// ```ignore
95    /// rep_levels[i] = child1_rep_levels[i];
96    /// ```
97    ///
98    /// The null bitmap of struct array is calculated from def_levels:
99    /// ```ignore
100    /// null_bitmap[i] = (def_levels[i] >= self.def_level);
101    /// ```
102    ///
103    fn consume_batch(&mut self) -> Result<ArrayRef> {
104        if self.children.is_empty() {
105            return Ok(Arc::new(StructArray::from(Vec::new())));
106        }
107
108        let children_array = self
109            .children
110            .iter_mut()
111            .map(|reader| reader.consume_batch())
112            .collect::<Result<Vec<_>>>()?;
113
114        // check that array child data has same size
115        let children_array_len = children_array
116            .first()
117            .map(|arr| arr.len())
118            .ok_or_else(|| general_err!("Struct array reader should have at least one child!"))?;
119
120        let all_children_len_eq = children_array
121            .iter()
122            .all(|arr| arr.len() == children_array_len);
123        if !all_children_len_eq {
124            return Err(general_err!("Not all children array length are the same!"));
125        }
126
127        let DataType::Struct(fields) = &self.data_type else {
128            return Err(general_err!(
129                "Internal: StructArrayReader must have struct data type, got {:?}",
130                self.data_type
131            ));
132        };
133        let fields = fields.clone(); // cloning Fields is cheap (Arc internally)
134
135        let mut nulls = None;
136        if self.nullable {
137            // calculate struct def level data
138
139            // children should have consistent view of parent, only need to inspect first child
140            let def_levels = self.children[0]
141                .get_def_levels()
142                .expect("child with nullable parents must have definition level");
143
144            // calculate bitmap for current array
145            let mut bitmap_builder = BooleanBufferBuilder::new(children_array_len);
146
147            match self.children[0].get_rep_levels() {
148                Some(rep_levels) => {
149                    // Sanity check
150                    assert_eq!(rep_levels.len(), def_levels.len());
151
152                    for (rep_level, def_level) in rep_levels.iter().zip(def_levels) {
153                        if rep_level > &self.struct_rep_level {
154                            // Already handled by inner list - SKIP
155                            continue;
156                        }
157                        bitmap_builder.append(*def_level >= self.struct_def_level)
158                    }
159                }
160                None => {
161                    // Safety: slice iterator has a trusted length
162                    unsafe {
163                        bitmap_builder.extend_trusted_len(
164                            def_levels
165                                .iter()
166                                .map(|level| *level >= self.struct_def_level),
167                        )
168                    }
169                }
170            }
171
172            if bitmap_builder.len() != children_array_len {
173                return Err(general_err!("Failed to decode level data for struct array"));
174            }
175            nulls = Some(NullBuffer::from(bitmap_builder));
176        }
177
178        // Safety: checked above that all children array data have same
179        // length and correct type
180        unsafe {
181            Ok(Arc::new(StructArray::new_unchecked_with_length(
182                fields,
183                children_array,
184                nulls,
185                children_array_len,
186            )))
187        }
188    }
189
190    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
191        let mut skipped = None;
192        for child in self.children.iter_mut() {
193            let child_skipped = child.skip_records(num_records)?;
194            match skipped {
195                Some(expected) => {
196                    if expected != child_skipped {
197                        return Err(general_err!(
198                            "StructArrayReader out of sync, expected {} skipped, got {}",
199                            expected,
200                            child_skipped
201                        ));
202                    }
203                }
204                None => skipped = Some(child_skipped),
205            }
206        }
207        Ok(skipped.unwrap_or(0))
208    }
209
210    fn get_def_levels(&self) -> Option<&[i16]> {
211        // Children definition levels should describe the same
212        // parent structure, so return first child's
213        self.children.first().and_then(|l| l.get_def_levels())
214    }
215
216    fn get_rep_levels(&self) -> Option<&[i16]> {
217        // Children definition levels should describe the same
218        // parent structure, so return first child's
219        self.children.first().and_then(|l| l.get_rep_levels())
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226    use crate::arrow::array_reader::ListArrayReader;
227    use crate::arrow::array_reader::test_util::InMemoryArrayReader;
228    use arrow::buffer::Buffer;
229    use arrow::datatypes::Field;
230    use arrow_array::cast::AsArray;
231    use arrow_array::{Array, Int32Array, ListArray};
232    use arrow_schema::Fields;
233
234    #[test]
235    fn test_struct_array_reader() {
236        let array_1 = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
237        let array_reader_1 = InMemoryArrayReader::new(
238            ArrowType::Int32,
239            array_1.clone(),
240            Some(vec![0, 1, 2, 3, 1]),
241            Some(vec![0, 1, 1, 1, 1]),
242        );
243
244        let array_2 = Arc::new(Int32Array::from(vec![5, 4, 3, 2, 1]));
245        let array_reader_2 = InMemoryArrayReader::new(
246            ArrowType::Int32,
247            array_2.clone(),
248            Some(vec![0, 1, 3, 1, 2]),
249            Some(vec![0, 1, 1, 1, 1]),
250        );
251
252        let struct_type = ArrowType::Struct(Fields::from(vec![
253            Field::new("f1", array_1.data_type().clone(), true),
254            Field::new("f2", array_2.data_type().clone(), true),
255        ]));
256
257        let mut struct_array_reader = StructArrayReader::new(
258            struct_type,
259            vec![Box::new(array_reader_1), Box::new(array_reader_2)],
260            1,
261            1,
262            true,
263        );
264
265        let struct_array = struct_array_reader.next_batch(5).unwrap();
266        let struct_array = struct_array.as_struct();
267
268        assert_eq!(5, struct_array.len());
269        assert_eq!(
270            vec![true, false, false, false, false],
271            (0..5)
272                .map(|idx| struct_array.is_null(idx))
273                .collect::<Vec<bool>>()
274        );
275        assert_eq!(
276            Some(vec![0, 1, 2, 3, 1].as_slice()),
277            struct_array_reader.get_def_levels()
278        );
279        assert_eq!(
280            Some(vec![0, 1, 1, 1, 1].as_slice()),
281            struct_array_reader.get_rep_levels()
282        );
283    }
284
285    #[test]
286    fn test_struct_array_reader_list() {
287        use arrow::datatypes::Int32Type;
288        // [
289        //    {foo: [1, 2, null],
290        //    {foo: []},
291        //    {foo: null},
292        //    null,
293        // ]
294
295        let expected_l = Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
296            Some(vec![Some(1), Some(2), None]),
297            Some(vec![]),
298            None,
299            None,
300        ]));
301
302        let validity = Buffer::from([0b00000111]);
303        let struct_fields = vec![(
304            Arc::new(Field::new("foo", expected_l.data_type().clone(), true)),
305            expected_l.clone() as ArrayRef,
306        )];
307        let expected = StructArray::from((struct_fields, validity));
308
309        let array = Arc::new(Int32Array::from_iter(vec![
310            Some(1),
311            Some(2),
312            None,
313            None,
314            None,
315            None,
316        ]));
317        let reader = InMemoryArrayReader::new(
318            ArrowType::Int32,
319            array,
320            Some(vec![4, 4, 3, 2, 1, 0]),
321            Some(vec![0, 1, 1, 0, 0, 0]),
322        );
323
324        let list_reader = ListArrayReader::<i32>::new(
325            Box::new(reader),
326            expected_l.data_type().clone(),
327            3,
328            1,
329            true,
330        );
331
332        let mut struct_reader = StructArrayReader::new(
333            expected.data_type().clone(),
334            vec![Box::new(list_reader)],
335            1,
336            0,
337            true,
338        );
339
340        let actual = struct_reader.next_batch(1024).unwrap();
341        let actual = actual.as_struct();
342        assert_eq!(actual, &expected)
343    }
344}