parquet/arrow/array_reader/
struct_array.rs1use crate::arrow::array_reader::ArrayReader;
19use crate::errors::{ParquetError, Result};
20use arrow_array::{builder::BooleanBufferBuilder, Array, ArrayRef, StructArray};
21use arrow_data::{ArrayData, ArrayDataBuilder};
22use arrow_schema::DataType as ArrowType;
23use std::any::Any;
24use std::sync::Arc;
25
26pub struct StructArrayReader {
28 children: Vec<Box<dyn ArrayReader>>,
29 data_type: ArrowType,
30 struct_def_level: i16,
31 struct_rep_level: i16,
32 nullable: bool,
33}
34
35impl StructArrayReader {
36 pub fn new(
38 data_type: ArrowType,
39 children: Vec<Box<dyn ArrayReader>>,
40 def_level: i16,
41 rep_level: i16,
42 nullable: bool,
43 ) -> Self {
44 Self {
45 data_type,
46 children,
47 struct_def_level: def_level,
48 struct_rep_level: rep_level,
49 nullable,
50 }
51 }
52}
53
54impl ArrayReader for StructArrayReader {
55 fn as_any(&self) -> &dyn Any {
56 self
57 }
58
59 fn get_data_type(&self) -> &ArrowType {
62 &self.data_type
63 }
64
65 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
66 let mut read = None;
67 for child in self.children.iter_mut() {
68 let child_read = child.read_records(batch_size)?;
69 match read {
70 Some(expected) => {
71 if expected != child_read {
72 return Err(general_err!(
73 "StructArrayReader out of sync in read_records, expected {} read, got {}",
74 expected,
75 child_read
76 ));
77 }
78 }
79 None => read = Some(child_read),
80 }
81 }
82 Ok(read.unwrap_or(0))
83 }
84
85 fn consume_batch(&mut self) -> Result<ArrayRef> {
104 if self.children.is_empty() {
105 return Ok(Arc::new(StructArray::from(Vec::new())));
106 }
107
108 let children_array = self
109 .children
110 .iter_mut()
111 .map(|reader| reader.consume_batch())
112 .collect::<Result<Vec<_>>>()?;
113
114 let children_array_len = children_array
116 .first()
117 .map(|arr| arr.len())
118 .ok_or_else(|| general_err!("Struct array reader should have at least one child!"))?;
119
120 let all_children_len_eq = children_array
121 .iter()
122 .all(|arr| arr.len() == children_array_len);
123 if !all_children_len_eq {
124 return Err(general_err!("Not all children array length are the same!"));
125 }
126
127 let mut array_data_builder = ArrayDataBuilder::new(self.data_type.clone())
129 .len(children_array_len)
130 .child_data(
131 children_array
132 .iter()
133 .map(|x| x.to_data())
134 .collect::<Vec<ArrayData>>(),
135 );
136
137 if self.nullable {
138 let def_levels = self.children[0]
142 .get_def_levels()
143 .expect("child with nullable parents must have definition level");
144
145 let mut bitmap_builder = BooleanBufferBuilder::new(children_array_len);
147
148 match self.children[0].get_rep_levels() {
149 Some(rep_levels) => {
150 assert_eq!(rep_levels.len(), def_levels.len());
152
153 for (rep_level, def_level) in rep_levels.iter().zip(def_levels) {
154 if rep_level > &self.struct_rep_level {
155 continue;
157 }
158 bitmap_builder.append(*def_level >= self.struct_def_level)
159 }
160 }
161 None => {
162 for def_level in def_levels {
163 bitmap_builder.append(*def_level >= self.struct_def_level)
164 }
165 }
166 }
167
168 if bitmap_builder.len() != children_array_len {
169 return Err(general_err!("Failed to decode level data for struct array"));
170 }
171
172 array_data_builder = array_data_builder.null_bit_buffer(Some(bitmap_builder.into()));
173 }
174
175 let array_data = unsafe { array_data_builder.build_unchecked() };
176 Ok(Arc::new(StructArray::from(array_data)))
177 }
178
179 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
180 let mut skipped = None;
181 for child in self.children.iter_mut() {
182 let child_skipped = child.skip_records(num_records)?;
183 match skipped {
184 Some(expected) => {
185 if expected != child_skipped {
186 return Err(general_err!(
187 "StructArrayReader out of sync, expected {} skipped, got {}",
188 expected,
189 child_skipped
190 ));
191 }
192 }
193 None => skipped = Some(child_skipped),
194 }
195 }
196 Ok(skipped.unwrap_or(0))
197 }
198
199 fn get_def_levels(&self) -> Option<&[i16]> {
200 self.children.first().and_then(|l| l.get_def_levels())
203 }
204
205 fn get_rep_levels(&self) -> Option<&[i16]> {
206 self.children.first().and_then(|l| l.get_rep_levels())
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use crate::arrow::array_reader::test_util::InMemoryArrayReader;
216 use crate::arrow::array_reader::ListArrayReader;
217 use arrow::buffer::Buffer;
218 use arrow::datatypes::Field;
219 use arrow_array::cast::AsArray;
220 use arrow_array::{Array, Int32Array, ListArray};
221 use arrow_schema::Fields;
222
223 #[test]
224 fn test_struct_array_reader() {
225 let array_1 = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
226 let array_reader_1 = InMemoryArrayReader::new(
227 ArrowType::Int32,
228 array_1.clone(),
229 Some(vec![0, 1, 2, 3, 1]),
230 Some(vec![0, 1, 1, 1, 1]),
231 );
232
233 let array_2 = Arc::new(Int32Array::from(vec![5, 4, 3, 2, 1]));
234 let array_reader_2 = InMemoryArrayReader::new(
235 ArrowType::Int32,
236 array_2.clone(),
237 Some(vec![0, 1, 3, 1, 2]),
238 Some(vec![0, 1, 1, 1, 1]),
239 );
240
241 let struct_type = ArrowType::Struct(Fields::from(vec![
242 Field::new("f1", array_1.data_type().clone(), true),
243 Field::new("f2", array_2.data_type().clone(), true),
244 ]));
245
246 let mut struct_array_reader = StructArrayReader::new(
247 struct_type,
248 vec![Box::new(array_reader_1), Box::new(array_reader_2)],
249 1,
250 1,
251 true,
252 );
253
254 let struct_array = struct_array_reader.next_batch(5).unwrap();
255 let struct_array = struct_array.as_struct();
256
257 assert_eq!(5, struct_array.len());
258 assert_eq!(
259 vec![true, false, false, false, false],
260 (0..5)
261 .map(|idx| struct_array.is_null(idx))
262 .collect::<Vec<bool>>()
263 );
264 assert_eq!(
265 Some(vec![0, 1, 2, 3, 1].as_slice()),
266 struct_array_reader.get_def_levels()
267 );
268 assert_eq!(
269 Some(vec![0, 1, 1, 1, 1].as_slice()),
270 struct_array_reader.get_rep_levels()
271 );
272 }
273
274 #[test]
275 fn test_struct_array_reader_list() {
276 use arrow::datatypes::Int32Type;
277 let expected_l = Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
285 Some(vec![Some(1), Some(2), None]),
286 Some(vec![]),
287 None,
288 None,
289 ]));
290
291 let validity = Buffer::from([0b00000111]);
292 let struct_fields = vec![(
293 Arc::new(Field::new("foo", expected_l.data_type().clone(), true)),
294 expected_l.clone() as ArrayRef,
295 )];
296 let expected = StructArray::from((struct_fields, validity));
297
298 let array = Arc::new(Int32Array::from_iter(vec![
299 Some(1),
300 Some(2),
301 None,
302 None,
303 None,
304 None,
305 ]));
306 let reader = InMemoryArrayReader::new(
307 ArrowType::Int32,
308 array,
309 Some(vec![4, 4, 3, 2, 1, 0]),
310 Some(vec![0, 1, 1, 0, 0, 0]),
311 );
312
313 let list_reader = ListArrayReader::<i32>::new(
314 Box::new(reader),
315 expected_l.data_type().clone(),
316 3,
317 1,
318 true,
319 );
320
321 let mut struct_reader = StructArrayReader::new(
322 expected.data_type().clone(),
323 vec![Box::new(list_reader)],
324 1,
325 0,
326 true,
327 );
328
329 let actual = struct_reader.next_batch(1024).unwrap();
330 let actual = actual.as_struct();
331 assert_eq!(actual, &expected)
332 }
333}